package tech.powerjob.server.core.scheduler;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.Holder;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.remote.transporter.TransportService;

@Service
/* loaded from: input_file:BOOT-INF/lib/powerjob-server-core-4.3.7.jar:tech/powerjob/server/core/scheduler/InstanceStatusCheckService.class */
public class InstanceStatusCheckService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) InstanceStatusCheckService.class);
    private static final int MAX_BATCH_NUM_APP = 10;
    private static final int MAX_BATCH_NUM_INSTANCE = 3000;
    private static final int MAX_BATCH_UPDATE_NUM = 500;
    private static final long DISPATCH_TIMEOUT_MS = 30000;
    private static final long RECEIVE_TIMEOUT_MS = 60000;
    private static final long RUNNING_TIMEOUT_MS = 60000;
    private static final long WORKFLOW_WAITING_TIMEOUT_MS = 60000;
    public static final long CHECK_INTERVAL = 10000;
    private final TransportService transportService;
    private final DispatchService dispatchService;
    private final InstanceManager instanceManager;
    private final WorkflowInstanceManager workflowInstanceManager;
    private final AppInfoRepository appInfoRepository;
    private final JobInfoRepository jobInfoRepository;
    private final InstanceInfoRepository instanceInfoRepository;
    private final WorkflowInfoRepository workflowInfoRepository;
    private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;

    public void checkWorkflowInstance() {
        Stopwatch createStarted = Stopwatch.createStarted();
        List<Long> listAppIdByCurrentServer = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(listAppIdByCurrentServer)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            checkWorkflowInstance(listAppIdByCurrentServer);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] WorkflowInstance status check failed.", (Throwable) e);
        }
        log.info("[InstanceStatusChecker] WorkflowInstance status check using {}.", createStarted.stop());
    }

    public void checkWaitingDispatchInstance() {
        Stopwatch createStarted = Stopwatch.createStarted();
        List<Long> listAppIdByCurrentServer = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(listAppIdByCurrentServer)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            Lists.partition(listAppIdByCurrentServer, 10).forEach(this::handleWaitingDispatchInstance);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] WaitingDispatchInstance status check failed.", (Throwable) e);
        }
        log.info("[InstanceStatusChecker] WaitingDispatchInstance status check using {}.", createStarted.stop());
    }

    public void checkWaitingWorkerReceiveInstance() {
        Stopwatch createStarted = Stopwatch.createStarted();
        List<Long> listAppIdByCurrentServer = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(listAppIdByCurrentServer)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            Lists.partition(listAppIdByCurrentServer, 10).forEach(this::handleWaitingWorkerReceiveInstance);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check failed.", (Throwable) e);
        }
        log.info("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check using {}.", createStarted.stop());
    }

    public void checkRunningInstance() {
        Stopwatch createStarted = Stopwatch.createStarted();
        List<Long> listAppIdByCurrentServer = this.appInfoRepository.listAppIdByCurrentServer(this.transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(listAppIdByCurrentServer)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            Lists.partition(listAppIdByCurrentServer, 10).forEach(this::handleRunningInstance);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] RunningInstance status check failed.", (Throwable) e);
        }
        log.info("[InstanceStatusChecker] RunningInstance status check using {}.", createStarted.stop());
    }

    private void handleWaitingDispatchInstance(List<Long> list) {
        long currentTimeMillis = System.currentTimeMillis() - 30000;
        List<InstanceInfoDO> findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan = this.instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(list, InstanceStatus.WAITING_DISPATCH.getV(), currentTimeMillis, PageRequest.of(0, 3000));
        while (true) {
            List<InstanceInfoDO> list2 = findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan;
            if (list2.isEmpty()) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            long currentTimeMillis2 = System.currentTimeMillis();
            for (Map.Entry entry : ((Map) list2.stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getAppId();
            }))).entrySet()) {
                Long l = (Long) entry.getKey();
                List list3 = (List) entry.getValue();
                Map map = (Map) this.jobInfoRepository.findByIdIn((Set) list3.stream().map((v0) -> {
                    return v0.getJobId();
                }).collect(Collectors.toSet())).stream().collect(Collectors.toMap((v0) -> {
                    return v0.getId();
                }, jobInfoDO -> {
                    return jobInfoDO;
                }));
                log.warn("[InstanceStatusChecker] find some instance in app({}) which is not triggered as expected: {}", l, list3.stream().map((v0) -> {
                    return v0.getInstanceId();
                }).collect(Collectors.toList()));
                Holder holder = new Holder(false);
                list3.parallelStream().forEach(instanceInfoDO -> {
                    if (((Boolean) holder.get()).booleanValue()) {
                        return;
                    }
                    Optional ofNullable = Optional.ofNullable(map.get(instanceInfoDO.getJobId()));
                    if (ofNullable.isPresent()) {
                        this.dispatchService.dispatch((JobInfoDO) ofNullable.get(), instanceInfoDO.getInstanceId(), Optional.of(instanceInfoDO), Optional.of(holder));
                    } else {
                        log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instanceInfoDO.getJobId(), instanceInfoDO);
                        this.instanceInfoRepository.findById(instanceInfoDO.getId()).ifPresent(instanceInfoDO -> {
                            updateFailedInstance(instanceInfoDO, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO);
                        });
                    }
                });
                currentTimeMillis = System.currentTimeMillis() - 30000;
                if (((Boolean) holder.get()).booleanValue()) {
                    arrayList.add(l);
                }
            }
            log.info("[InstanceStatusChecker] process {} task,use {} ms", Integer.valueOf(list2.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
            if (!arrayList.isEmpty()) {
                log.warn("[InstanceStatusChecker] app[{}] is overload, so skip check waiting dispatch instance", arrayList);
                list.removeAll(arrayList);
            }
            if (list.isEmpty()) {
                return;
            } else {
                findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan = this.instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(list, InstanceStatus.WAITING_DISPATCH.getV(), currentTimeMillis, PageRequest.of(0, 3000));
            }
        }
    }

    private void handleWaitingWorkerReceiveInstance(List<Long> list) {
        List<BriefInstanceInfo> selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(list, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), System.currentTimeMillis() - 60000, PageRequest.of(0, 3000));
        while (true) {
            List<BriefInstanceInfo> list2 = selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan;
            if (list2.isEmpty()) {
                return;
            }
            log.warn("[InstanceStatusChecker] find some instance didn't receive any reply from worker, try to redispatch: {}", list2.stream().map((v0) -> {
                return v0.getInstanceId();
            }).collect(Collectors.toList()));
            Iterator it = Lists.partition(list2, 500).iterator();
            while (it.hasNext()) {
                this.dispatchService.redispatchBatchAsyncLockFree((List) ((List) it.next()).stream().map((v0) -> {
                    return v0.getInstanceId();
                }).collect(Collectors.toList()), InstanceStatus.WAITING_WORKER_RECEIVE.getV());
            }
            selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(list, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), System.currentTimeMillis() - 60000, PageRequest.of(0, 3000));
        }
    }

    private void handleRunningInstance(List<Long> list) {
        List<BriefInstanceInfo> selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(list, InstanceStatus.RUNNING.getV(), new Date(System.currentTimeMillis() - 60000), PageRequest.of(0, 3000));
        while (true) {
            List<BriefInstanceInfo> list2 = selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore;
            if (list2.isEmpty()) {
                return;
            }
            Map map = (Map) this.jobInfoRepository.findByIdIn((Set) list2.stream().map((v0) -> {
                return v0.getJobId();
            }).collect(Collectors.toSet())).stream().collect(Collectors.toMap((v0) -> {
                return v0.getId();
            }, jobInfoDO -> {
                return jobInfoDO;
            }));
            log.warn("[InstanceStatusCheckService] find some instances have not received status report for a long time : {}", list2.stream().map((v0) -> {
                return v0.getInstanceId();
            }).collect(Collectors.toList()));
            list2.forEach(briefInstanceInfo -> {
                Optional ofNullable = Optional.ofNullable(map.get(briefInstanceInfo.getJobId()));
                if (!ofNullable.isPresent()) {
                    this.instanceInfoRepository.findById(briefInstanceInfo.getId()).ifPresent(instanceInfoDO -> {
                        updateFailedInstance(instanceInfoDO, SystemInstanceResult.REPORT_TIMEOUT);
                    });
                    return;
                }
                TimeExpressionType of = TimeExpressionType.of(((JobInfoDO) ofNullable.get()).getTimeExpressionType().intValue());
                if (SwitchableStatus.of(((JobInfoDO) ofNullable.get()).getStatus().intValue()) != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(Integer.valueOf(of.getV()))) {
                    this.instanceInfoRepository.findById(briefInstanceInfo.getId()).ifPresent(instanceInfoDO2 -> {
                        updateFailedInstance(instanceInfoDO2, SystemInstanceResult.REPORT_TIMEOUT);
                    });
                } else if (briefInstanceInfo.getRunningTimes().longValue() < ((JobInfoDO) ofNullable.get()).getInstanceRetryNum().intValue()) {
                    this.dispatchService.redispatchAsync(briefInstanceInfo.getInstanceId(), InstanceStatus.RUNNING.getV());
                } else {
                    this.instanceInfoRepository.findById(briefInstanceInfo.getId()).ifPresent(instanceInfoDO3 -> {
                        updateFailedInstance(instanceInfoDO3, SystemInstanceResult.REPORT_TIMEOUT);
                    });
                }
            });
            selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore = this.instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(list, InstanceStatus.RUNNING.getV(), new Date(System.currentTimeMillis() - 60000), PageRequest.of(0, 3000));
        }
    }

    private void checkWorkflowInstance(List<Long> list) {
        long currentTimeMillis = System.currentTimeMillis() - 60000;
        Lists.partition(list, 10).forEach(list2 -> {
            List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan = this.workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(list2, WorkflowInstanceStatus.WAITING.getV(), currentTimeMillis);
            if (CollectionUtils.isEmpty(findByAppIdInAndStatusAndExpectedTriggerTimeLessThan)) {
                return;
            }
            log.warn("[WorkflowInstanceChecker] wfInstance({}) is not started as expected, oms try to restart these workflowInstance.", (List) findByAppIdInAndStatusAndExpectedTriggerTimeLessThan.stream().map((v0) -> {
                return v0.getWfInstanceId();
            }).collect(Collectors.toList()));
            findByAppIdInAndStatusAndExpectedTriggerTimeLessThan.forEach(workflowInstanceInfoDO -> {
                this.workflowInfoRepository.findById(workflowInstanceInfoDO.getWorkflowId()).ifPresent(workflowInfoDO -> {
                    this.workflowInstanceManager.start(workflowInfoDO, workflowInstanceInfoDO.getWfInstanceId());
                    log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfoDO.getId(), workflowInstanceInfoDO.getWfInstanceId());
                });
            });
        });
    }

    private void updateFailedInstance(InstanceInfoDO instanceInfoDO, String str) {
        log.warn("[InstanceStatusChecker] instance[{}] failed due to {}, instanceInfo: {}", instanceInfoDO.getInstanceId(), str, instanceInfoDO);
        instanceInfoDO.setStatus(Integer.valueOf(InstanceStatus.FAILED.getV()));
        instanceInfoDO.setFinishedTime(Long.valueOf(System.currentTimeMillis()));
        instanceInfoDO.setGmtModified(new Date());
        instanceInfoDO.setResult(str);
        this.instanceInfoRepository.saveAndFlush(instanceInfoDO);
        this.instanceManager.processFinishedInstance(instanceInfoDO.getInstanceId(), instanceInfoDO.getWfInstanceId(), InstanceStatus.FAILED, str);
    }

    public InstanceStatusCheckService(TransportService transportService, DispatchService dispatchService, InstanceManager instanceManager, WorkflowInstanceManager workflowInstanceManager, AppInfoRepository appInfoRepository, JobInfoRepository jobInfoRepository, InstanceInfoRepository instanceInfoRepository, WorkflowInfoRepository workflowInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository) {
        this.transportService = transportService;
        this.dispatchService = dispatchService;
        this.instanceManager = instanceManager;
        this.workflowInstanceManager = workflowInstanceManager;
        this.appInfoRepository = appInfoRepository;
        this.jobInfoRepository = jobInfoRepository;
        this.instanceInfoRepository = instanceInfoRepository;
        this.workflowInfoRepository = workflowInfoRepository;
        this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
    }
}
