package io.nflow.engine.internal.executor;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.nflow.engine.internal.dao.ExecutorDao;
import io.nflow.engine.internal.dao.PollingRaceConditionException;
import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.internal.util.PeriodicLogger;
import io.nflow.engine.service.WorkflowDefinitionService;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@SuppressFBWarnings(value = {"MDM_RANDOM_SEED"}, justification = "rand does not need to be secure")
@Component
/* loaded from: input_file:io/nflow/engine/internal/executor/WorkflowDispatcher.class */
public class WorkflowDispatcher implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) WorkflowDispatcher.class);
    private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60);
    private volatile boolean shutdownRequested;
    private final WorkflowInstanceExecutor executor;
    private final WorkflowInstanceDao workflowInstances;
    private final WorkflowStateProcessorFactory stateProcessorFactory;
    private final WorkflowDefinitionService workflowDefinitions;
    private final ExecutorDao executorDao;
    private final long sleepTimeMillis;
    private final int stuckThreadThresholdSeconds;
    private volatile boolean running = false;
    private volatile boolean paused = false;
    private final CountDownLatch shutdownDone = new CountDownLatch(1);
    private final Random rand = new Random();

    @Inject
    @SuppressFBWarnings(value = {"WEM_WEAK_EXCEPTION_MESSAGING"}, justification = "Transaction support exception message is fine")
    public WorkflowDispatcher(WorkflowInstanceExecutor workflowInstanceExecutor, WorkflowInstanceDao workflowInstanceDao, WorkflowStateProcessorFactory workflowStateProcessorFactory, WorkflowDefinitionService workflowDefinitionService, ExecutorDao executorDao, Environment environment) {
        this.executor = workflowInstanceExecutor;
        this.workflowInstances = workflowInstanceDao;
        this.stateProcessorFactory = workflowStateProcessorFactory;
        this.workflowDefinitions = workflowDefinitionService;
        this.executorDao = executorDao;
        this.sleepTimeMillis = ((Long) environment.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class)).longValue();
        this.stuckThreadThresholdSeconds = ((Integer) environment.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class)).intValue();
        if (!executorDao.isTransactionSupportEnabled()) {
            throw new BeanCreationException("Transaction support must be enabled");
        }
        if (!executorDao.isAutoCommitEnabled()) {
            throw new BeanCreationException("DataSource must have auto commit enabled");
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        logger.info("Starting.");
        try {
            this.workflowDefinitions.postProcessWorkflowDefinitions();
            this.running = true;
            while (!this.shutdownRequested) {
                if (this.paused) {
                    sleep(false);
                } else {
                    try {
                        this.executor.waitUntilQueueSizeLowerThanThreshold(this.executorDao.getMaxWaitUntil());
                        if (!this.shutdownRequested) {
                            if (this.executorDao.tick()) {
                                this.workflowInstances.recoverWorkflowInstancesFromDeadNodes();
                            }
                            int potentiallyStuckProcessors = this.stateProcessorFactory.getPotentiallyStuckProcessors();
                            if (potentiallyStuckProcessors > 0) {
                                periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)", Integer.valueOf(potentiallyStuckProcessors), Integer.valueOf(this.executor.getThreadCount()), Integer.valueOf(this.stuckThreadThresholdSeconds));
                            }
                            dispatch(getNextInstanceIds());
                        }
                    } catch (PollingRaceConditionException e) {
                        logger.info(e.getMessage());
                        sleep(true);
                    } catch (InterruptedException e2) {
                    } catch (Exception e3) {
                        logger.error("Exception in executing dispatcher - retrying after sleep period (" + e3.getMessage() + ")", (Throwable) e3);
                        sleep(false);
                    }
                }
            }
        } finally {
            this.running = false;
            shutdownPool();
            this.executorDao.markShutdown();
            logger.info("Shutdown finished.");
            this.shutdownDone.countDown();
        }
    }

    public void shutdown() {
        this.shutdownRequested = true;
        if (!this.running) {
            logger.info("Shutdown requested, but executor not running, exiting.");
            return;
        }
        logger.info("Shutdown requested.");
        try {
            this.shutdownDone.await();
        } catch (InterruptedException e) {
            logger.info("Shutdown interrupted.");
        }
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
    }

    public boolean isPaused() {
        return this.paused;
    }

    public boolean isRunning() {
        return this.running;
    }

    private void shutdownPool() {
        try {
            this.executor.shutdown();
        } catch (Exception e) {
            logger.error("Error in shutting down thread pool.", (Throwable) e);
        }
    }

    private void dispatch(List<Long> list) {
        if (list.isEmpty()) {
            logger.debug("Found no workflow instances, sleeping.");
            sleep(false);
            return;
        }
        logger.debug("Found {} workflow instances, dispatching executors.", Integer.valueOf(list.size()));
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            this.executor.execute(this.stateProcessorFactory.createProcessor(it.next().longValue()));
        }
    }

    private List<Long> getNextInstanceIds() {
        int queueRemainingCapacity = this.executor.getQueueRemainingCapacity();
        logger.debug("Polling next {} workflow instances.", Integer.valueOf(queueRemainingCapacity));
        return this.workflowInstances.pollNextWorkflowInstanceIds(queueRemainingCapacity);
    }

    @SuppressFBWarnings(value = {"MDM_THREAD_YIELD"}, justification = "Intentionally masking race condition")
    private void sleep(boolean z) {
        try {
            if (z) {
                Thread.sleep((long) (this.sleepTimeMillis * this.rand.nextDouble()));
            } else {
                Thread.sleep(this.sleepTimeMillis);
            }
        } catch (InterruptedException e) {
        }
    }
}
