package org.apache.beam.runners.direct.portable;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.ExecutionDriver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/direct/portable/QuiescenceDriver.class */
class QuiescenceDriver implements ExecutionDriver {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) QuiescenceDriver.class);
    private final EvaluationContext evaluationContext;
    private final ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph;
    private final BundleProcessor<PipelineNode.PCollectionNode, CommittedBundle<?>, PipelineNode.PTransformNode> bundleProcessor;
    private final PipelineMessageReceiver pipelineMessageReceiver;
    private final Map<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootBundles;
    private final CompletionCallback defaultCompletionCallback = new TimerIterableCompletionCallback(Collections.emptyList());
    private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue();
    private final AtomicReference<ExecutorState> state = new AtomicReference<>(ExecutorState.QUIESCENT);
    private final AtomicLong outstandingWork = new AtomicLong(0);
    private boolean exceptionThrown = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/QuiescenceDriver$ExecutorState.class */
    public enum ExecutorState {
        ACTIVE,
        PROCESSING,
        QUIESCING,
        QUIESCENT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/QuiescenceDriver$TimerIterableCompletionCallback.class */
    public class TimerIterableCompletionCallback implements CompletionCallback {
        private final Iterable<TimerInternals.TimerData> timers;

        TimerIterableCompletionCallback(Iterable<TimerInternals.TimerData> iterable) {
            this.timers = iterable;
        }

        @Override // org.apache.beam.runners.direct.portable.CompletionCallback
        public final CommittedResult handleResult(CommittedBundle<?> committedBundle, TransformResult<?> transformResult) {
            CommittedResult<PipelineNode.PTransformNode> handleResult = QuiescenceDriver.this.evaluationContext.handleResult(committedBundle, this.timers, transformResult);
            for (CommittedBundle<?> committedBundle2 : handleResult.getOutputs()) {
                QuiescenceDriver.this.pendingWork.offer(WorkUpdate.fromBundle(committedBundle2, QuiescenceDriver.this.graph.getPerElementConsumers(committedBundle2.getPCollection())));
            }
            Optional<? extends CommittedBundle<?>> unprocessedInputs = handleResult.getUnprocessedInputs();
            if (unprocessedInputs.isPresent()) {
                if (committedBundle.getPCollection() == null) {
                    ((ConcurrentLinkedQueue) QuiescenceDriver.this.pendingRootBundles.get(transformResult.getTransform())).offer(unprocessedInputs.get());
                } else {
                    QuiescenceDriver.this.pendingWork.offer(WorkUpdate.fromBundle(unprocessedInputs.get(), Collections.singleton(handleResult.getExecutable())));
                }
            }
            if (!handleResult.getProducedOutputTypes().isEmpty()) {
                QuiescenceDriver.this.state.set(ExecutorState.ACTIVE);
            }
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
            return handleResult;
        }

        @Override // org.apache.beam.runners.direct.portable.CompletionCallback
        public void handleEmpty(PipelineNode.PTransformNode pTransformNode) {
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
        }

        @Override // org.apache.beam.runners.direct.portable.CompletionCallback
        public final void handleException(CommittedBundle<?> committedBundle, Exception exc) {
            QuiescenceDriver.this.pendingWork.offer(WorkUpdate.fromException(exc));
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
        }

        @Override // org.apache.beam.runners.direct.portable.CompletionCallback
        public void handleError(Error error) {
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
            QuiescenceDriver.this.pipelineMessageReceiver.failed(error);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/QuiescenceDriver$WorkUpdate.class */
    public static abstract class WorkUpdate {
        /* JADX INFO: Access modifiers changed from: private */
        public static WorkUpdate fromBundle(CommittedBundle<?> committedBundle, Collection<PipelineNode.PTransformNode> collection) {
            return new AutoValue_QuiescenceDriver_WorkUpdate(Optional.of(committedBundle), collection, Optional.absent());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static WorkUpdate fromException(Exception exc) {
            return new AutoValue_QuiescenceDriver_WorkUpdate(Optional.absent(), Collections.emptyList(), Optional.of(exc));
        }

        public abstract Optional<? extends CommittedBundle<?>> getBundle();

        public abstract Collection<PipelineNode.PTransformNode> getConsumers();

        public abstract Optional<? extends Exception> getException();
    }

    public static ExecutionDriver create(EvaluationContext evaluationContext, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> executableGraph, BundleProcessor<PipelineNode.PCollectionNode, CommittedBundle<?>, PipelineNode.PTransformNode> bundleProcessor, PipelineMessageReceiver pipelineMessageReceiver, Map<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> map) {
        return new QuiescenceDriver(evaluationContext, executableGraph, bundleProcessor, pipelineMessageReceiver, map);
    }

    private QuiescenceDriver(EvaluationContext evaluationContext, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> executableGraph, BundleProcessor<PipelineNode.PCollectionNode, CommittedBundle<?>, PipelineNode.PTransformNode> bundleProcessor, PipelineMessageReceiver pipelineMessageReceiver, Map<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> map) {
        this.evaluationContext = evaluationContext;
        this.graph = executableGraph;
        this.bundleProcessor = bundleProcessor;
        this.pipelineMessageReceiver = pipelineMessageReceiver;
        this.pendingRootBundles = map;
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.local.ExecutionDriver
    public ExecutionDriver.DriverState drive() {
        boolean z = this.outstandingWork.get() == 0;
        ExecutorState executorState = this.state.get();
        if (executorState == ExecutorState.ACTIVE) {
            this.state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING);
        } else if (executorState == ExecutorState.PROCESSING && z) {
            this.state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING);
        } else if (executorState == ExecutorState.QUIESCING && z) {
            this.state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT);
        }
        fireTimers();
        ArrayList arrayList = new ArrayList();
        WorkUpdate poll = this.pendingWork.poll();
        while (true) {
            WorkUpdate workUpdate = poll;
            if (workUpdate == null) {
                break;
            }
            arrayList.add(workUpdate);
            poll = this.pendingWork.poll();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            applyUpdate(z, executorState, (WorkUpdate) it.next());
        }
        addWorkIfNecessary();
        return this.exceptionThrown ? ExecutionDriver.DriverState.FAILED : this.evaluationContext.isDone() ? ExecutionDriver.DriverState.SHUTDOWN : ExecutionDriver.DriverState.CONTINUE;
    }

    private void applyUpdate(boolean z, ExecutorState executorState, WorkUpdate workUpdate) {
        LOG.debug("Executor Update: {}", workUpdate);
        if (!workUpdate.getBundle().isPresent()) {
            if (workUpdate.getException().isPresent()) {
                this.pipelineMessageReceiver.failed(workUpdate.getException().get());
                this.exceptionThrown = true;
                return;
            }
            return;
        }
        if (ExecutorState.ACTIVE != executorState && (ExecutorState.PROCESSING != executorState || !z)) {
            this.pendingWork.offer(workUpdate);
            return;
        }
        CommittedBundle<?> committedBundle = workUpdate.getBundle().get();
        for (PipelineNode.PTransformNode pTransformNode : workUpdate.getConsumers()) {
            this.outstandingWork.incrementAndGet();
            this.bundleProcessor.process(committedBundle, pTransformNode, this.defaultCompletionCallback);
        }
    }

    private void fireTimers() {
        try {
            for (WatermarkManager.FiredTimers<PipelineNode.PTransformNode> firedTimers : this.evaluationContext.extractFiredTimers()) {
                Collection<TimerInternals.TimerData> timers = firedTimers.getTimers();
                KeyedWorkItem timersWorkItem = KeyedWorkItems.timersWorkItem(firedTimers.getKey().getKey(), timers);
                CommittedBundle<?> commit = this.evaluationContext.createKeyedBundle(firedTimers.getKey(), (PipelineNode.PCollectionNode) Iterables.getOnlyElement(this.graph.getPerElementInputs(firedTimers.getExecutable()))).add(WindowedValue.valueInGlobalWindow(timersWorkItem)).commit(this.evaluationContext.now());
                this.outstandingWork.incrementAndGet();
                this.bundleProcessor.process(commit, firedTimers.getExecutable(), new TimerIterableCompletionCallback(timers));
                this.state.set(ExecutorState.ACTIVE);
            }
        } catch (Exception e) {
            LOG.error("Internal Error while delivering timers", (Throwable) e);
            this.pipelineMessageReceiver.failed(e);
            this.exceptionThrown = true;
        }
    }

    private void addWorkIfNecessary() {
        if (this.state.get() == ExecutorState.QUIESCENT) {
            for (Map.Entry<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> entry : this.pendingRootBundles.entrySet()) {
                ArrayList<CommittedBundle<?>> arrayList = new ArrayList();
                while (!entry.getValue().isEmpty()) {
                    arrayList.add(entry.getValue().poll());
                }
                for (CommittedBundle<?> committedBundle : arrayList) {
                    this.outstandingWork.incrementAndGet();
                    this.bundleProcessor.process(committedBundle, entry.getKey(), this.defaultCompletionCallback);
                    this.state.set(ExecutorState.ACTIVE);
                }
            }
        }
    }
}
