package org.apache.beam.runners.spark.translation;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkProcessContext.class */
class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
    private final DoFn<FnInputT, FnOutputT> doFn;
    private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
    private final SparkOutputManager<OutputT> outputManager;
    private Iterator<TimerInternals.TimerData> timerDataIterator;

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkProcessContext$NoOpStepContext.class */
    static class NoOpStepContext implements StepContext {
        @Override // org.apache.beam.runners.core.StepContext
        public StateInternals stateInternals() {
            throw new UnsupportedOperationException("stateInternals not supported");
        }

        @Override // org.apache.beam.runners.core.StepContext
        public TimerInternals timerInternals() {
            throw new UnsupportedOperationException("timerInternals not supported");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkProcessContext$ProcCtxtIterator.class */
    private class ProcCtxtIterator extends AbstractIterator<OutputT> {
        private final Iterator<WindowedValue<FnInputT>> inputIterator;
        private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
        private Iterator<OutputT> outputIterator;
        private boolean isBundleStarted;
        private boolean isBundleFinished;

        ProcCtxtIterator(Iterator<WindowedValue<FnInputT>> it, DoFnRunner<FnInputT, FnOutputT> doFnRunner) {
            this.inputIterator = it;
            this.doFnRunner = doFnRunner;
            this.outputIterator = SparkProcessContext.this.getOutputIterator();
        }

        @Override // org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator
        protected OutputT computeNext() {
            if (!this.isBundleStarted) {
                this.isBundleStarted = true;
                this.doFnRunner.startBundle();
            }
            while (!this.outputIterator.hasNext()) {
                try {
                    SparkProcessContext.this.clearOutput();
                    if (this.inputIterator.hasNext()) {
                        this.doFnRunner.processElement(this.inputIterator.next());
                        this.outputIterator = SparkProcessContext.this.getOutputIterator();
                    } else if (SparkProcessContext.this.timerDataIterator.hasNext()) {
                        fireTimer((TimerInternals.TimerData) SparkProcessContext.this.timerDataIterator.next());
                        this.outputIterator = SparkProcessContext.this.getOutputIterator();
                    } else {
                        if (this.isBundleFinished) {
                            DoFnInvokers.invokerFor(SparkProcessContext.this.doFn).invokeTeardown();
                            return endOfData();
                        }
                        this.isBundleFinished = true;
                        this.doFnRunner.finishBundle();
                        this.outputIterator = SparkProcessContext.this.getOutputIterator();
                    }
                } catch (RuntimeException e) {
                    DoFnInvokers.invokerFor(SparkProcessContext.this.doFn).invokeTeardown();
                    throw e;
                }
            }
            return this.outputIterator.next();
        }

        private void fireTimer(TimerInternals.TimerData timerData) {
            StateNamespace namespace = timerData.getNamespace();
            Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
            this.doFnRunner.onTimer(timerData.getTimerId(), ((StateNamespaces.WindowNamespace) namespace).getWindow(), timerData.getTimestamp(), timerData.getDomain());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkProcessContext$SparkOutputManager.class */
    public interface SparkOutputManager<T> extends DoFnRunners.OutputManager, Iterable<T> {
        void clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkProcessContext(DoFn<FnInputT, FnOutputT> doFn, DoFnRunner<FnInputT, FnOutputT> doFnRunner, SparkOutputManager<OutputT> sparkOutputManager, Iterator<TimerInternals.TimerData> it) {
        this.doFn = doFn;
        this.doFnRunner = doFnRunner;
        this.outputManager = sparkOutputManager;
        this.timerDataIterator = it;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> it) throws Exception {
        return !it.hasNext() ? new ArrayList() : getOutputIterable(it, this.doFnRunner);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearOutput() {
        this.outputManager.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Iterator<OutputT> getOutputIterator() {
        return this.outputManager.iterator();
    }

    private Iterable<OutputT> getOutputIterable(Iterator<WindowedValue<FnInputT>> it, DoFnRunner<FnInputT, FnOutputT> doFnRunner) {
        return () -> {
            return new ProcCtxtIterator(it, doFnRunner);
        };
    }
}
