package org.apache.beam.runners.spark;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
import org.apache.beam.runners.spark.metrics.CompositeSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkBeamMetricSource;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/SparkRunner.class */
public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SparkRunner.class);
    private final SparkPipelineOptions mOptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunner$CacheVisitor.class */
    public static class CacheVisitor extends Evaluator {
        protected CacheVisitor(SparkPipelineTranslator sparkPipelineTranslator, EvaluationContext evaluationContext) {
            super(sparkPipelineTranslator, evaluationContext);
        }

        @Override // org.apache.beam.runners.spark.SparkRunner.Evaluator
        public void doVisitTransform(TransformHierarchy.Node node) {
            for (PValue pValue : node.getInputs().values()) {
                if (pValue instanceof PCollection) {
                    this.ctxt.getCacheCandidates().put((PCollection) pValue, Long.valueOf(this.ctxt.getCacheCandidates().get(pValue) != null ? this.ctxt.getCacheCandidates().get(pValue).longValue() + 1 : 1L));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunner$Evaluator.class */
    public static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
        private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Evaluator.class);
        protected final EvaluationContext ctxt;
        protected final SparkPipelineTranslator translator;

        public Evaluator(SparkPipelineTranslator sparkPipelineTranslator, EvaluationContext evaluationContext) {
            this.translator = sparkPipelineTranslator;
            this.ctxt = evaluationContext;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
            if (node.getTransform() != null) {
                Class<?> cls = node.getTransform().getClass();
                if (this.translator.hasTranslation(cls) && !shouldDefer(node)) {
                    LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
                    LOG.debug("Composite transform class: '{}'", cls);
                    doVisitTransform(node);
                    return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
                }
            }
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }

        protected boolean shouldDefer(TransformHierarchy.Node node) {
            Collection<PValue> nonAdditionalInputs = TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
            if (nonAdditionalInputs.size() != 1) {
                return false;
            }
            PValue pValue = (PValue) Iterables.getOnlyElement(nonAdditionalInputs);
            if (!(pValue instanceof PCollection) || ((PCollection) pValue).getWindowingStrategy().getWindowFn().isNonMerging()) {
                return false;
            }
            PTransform<?, ?> transform = node.getTransform();
            boolean z = false;
            if (transform instanceof Combine.PerKey) {
                List<PCollectionView<?>> sideInputs = ((Combine.PerKey) transform).getSideInputs();
                z = (sideInputs == null || sideInputs.isEmpty()) ? false : true;
            } else if (transform instanceof Combine.Globally) {
                List<PCollectionView<?>> sideInputs2 = ((Combine.Globally) transform).getSideInputs();
                z = (sideInputs2 == null || sideInputs2.isEmpty()) ? false : true;
            }
            if (!z) {
                return false;
            }
            LOG.info("Deferring combine transformation {} for job {}", transform, this.ctxt.getOptions().getJobName());
            return true;
        }

        @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            doVisitTransform(node);
        }

        /* JADX WARN: Multi-variable type inference failed */
        <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformHierarchy.Node node) {
            PTransform<?, ?> transform = node.getTransform();
            TransformEvaluator translate = translate(node, transform, transform.getClass());
            LOG.info("Evaluating {}", transform);
            this.ctxt.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
            translate.evaluate(transform, this.ctxt);
            this.ctxt.setCurrentTransform(null);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public <TransformT extends PTransform<? super PInput, POutput>> TransformEvaluator<TransformT> translate(TransformHierarchy.Node node, TransformT transformt, Class<TransformT> cls) {
            PCollection.IsBounded isBoundedCollection = isBoundedCollection((node.getInputs().isEmpty() ? node.getOutputs() : node.getInputs()).values());
            LOG.debug("Translating {} as {}", transformt, isBoundedCollection);
            return isBoundedCollection.equals(PCollection.IsBounded.BOUNDED) ? this.translator.translateBounded(cls) : this.translator.translateUnbounded(cls);
        }

        protected PCollection.IsBounded isBoundedCollection(Collection<PValue> collection) {
            PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
            for (PValue pValue : collection) {
                isBounded = pValue instanceof PCollection ? isBounded.and(((PCollection) pValue).isBounded()) : isBounded.and(PCollection.IsBounded.BOUNDED);
            }
            return isBounded;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunner$TranslationMode.class */
    public enum TranslationMode {
        BATCH,
        STREAMING
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunner$TranslationModeDetector.class */
    public static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults {
        private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TranslationModeDetector.class);
        private TranslationMode translationMode;

        TranslationModeDetector(TranslationMode translationMode) {
            this.translationMode = translationMode;
        }

        TranslationModeDetector() {
            this(TranslationMode.BATCH);
        }

        TranslationMode getTranslationMode() {
            return this.translationMode;
        }

        @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
        public void visitValue(PValue pValue, TransformHierarchy.Node node) {
            if (this.translationMode.equals(TranslationMode.BATCH) && (pValue instanceof PCollection) && ((PCollection) pValue).isBounded() == PCollection.IsBounded.UNBOUNDED) {
                LOG.info("Found unbounded PCollection {}. Switching to streaming execution.", pValue.getName());
                this.translationMode = TranslationMode.STREAMING;
            }
        }
    }

    public static SparkRunner create() {
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) PipelineOptionsFactory.as(SparkPipelineOptions.class);
        sparkPipelineOptions.setRunner(SparkRunner.class);
        return new SparkRunner(sparkPipelineOptions);
    }

    public static SparkRunner create(SparkPipelineOptions sparkPipelineOptions) {
        return new SparkRunner(sparkPipelineOptions);
    }

    public static SparkRunner fromOptions(PipelineOptions pipelineOptions) {
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) PipelineOptionsValidator.validate(SparkPipelineOptions.class, pipelineOptions);
        if (sparkPipelineOptions.getFilesToStage() == null) {
            sparkPipelineOptions.setFilesToStage(PipelineResources.detectClassPathResourcesToStage(SparkRunner.class.getClassLoader()));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage {} files. Enable logging at DEBUG level to see which files will be staged.", Integer.valueOf(sparkPipelineOptions.getFilesToStage().size()));
            LOG.debug("Classpath elements: {}", sparkPipelineOptions.getFilesToStage());
        }
        return new SparkRunner(sparkPipelineOptions);
    }

    private SparkRunner(SparkPipelineOptions sparkPipelineOptions) {
        this.mOptions = sparkPipelineOptions;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.sdk.PipelineRunner
    public SparkPipelineResult run(Pipeline pipeline) {
        SparkPipelineResult batchMode;
        LOG.info("Executing pipeline using the SparkRunner.");
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        MetricsEnvironment.setMetricsSupported(true);
        detectTranslationMode(pipeline);
        pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(this.mOptions.isStreaming()));
        if (this.mOptions.isStreaming()) {
            Checkpoint.CheckpointDir checkpointDir = new Checkpoint.CheckpointDir(this.mOptions.getCheckpointDir());
            JavaStreamingContext orCreate = JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), new SparkRunnerStreamingContextFactory(pipeline, this.mOptions, checkpointDir));
            orCreate.addStreamingListener(new JavaStreamingListenerWrapper(new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
            orCreate.addStreamingListener(new JavaStreamingListenerWrapper(new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
            for (JavaStreamingListener javaStreamingListener : ((SparkContextOptions) this.mOptions.as(SparkContextOptions.class)).getListeners()) {
                LOG.info("Registered listener {}." + javaStreamingListener.getClass().getSimpleName());
                orCreate.addStreamingListener(new JavaStreamingListenerWrapper(javaStreamingListener));
            }
            orCreate.addStreamingListener(new JavaStreamingListenerWrapper(new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
            initAccumulators(this.mOptions, orCreate.sparkContext());
            Future<?> submit = newSingleThreadExecutor.submit(() -> {
                LOG.info("Starting streaming pipeline execution.");
                orCreate.start();
            });
            newSingleThreadExecutor.shutdown();
            batchMode = new SparkPipelineResult.StreamingMode(submit, orCreate);
        } else {
            JavaSparkContext sparkContext = SparkContextFactory.getSparkContext(this.mOptions);
            EvaluationContext evaluationContext = new EvaluationContext(sparkContext, pipeline, this.mOptions);
            TransformTranslator.Translator translator = new TransformTranslator.Translator();
            updateCacheCandidates(pipeline, translator, evaluationContext);
            initAccumulators(this.mOptions, sparkContext);
            Future<?> submit2 = newSingleThreadExecutor.submit(() -> {
                pipeline.traverseTopologically(new Evaluator(translator, evaluationContext));
                evaluationContext.computeOutputs();
                LOG.info("Batch pipeline execution complete.");
            });
            newSingleThreadExecutor.shutdown();
            batchMode = new SparkPipelineResult.BatchMode(submit2, sparkContext);
        }
        if (this.mOptions.getEnableSparkMetricSinks().booleanValue()) {
            registerMetricsSource(this.mOptions.getAppName());
        }
        new MetricsPusher((MetricsContainerStepMap) MetricsAccumulator.getInstance().value(), this.mOptions, batchMode).start();
        return batchMode;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void registerMetricsSource(String str) {
        MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
        CompositeSource compositeSource = new CompositeSource(str + ".Beam", new SparkBeamMetricSource(null).metricRegistry(), new AggregatorMetricSource(null, (NamedAggregators) AggregatorsAccumulator.getInstance().value()).metricRegistry());
        metricsSystem.removeSource(compositeSource);
        metricsSystem.registerSource(compositeSource);
    }

    public static void initAccumulators(SparkPipelineOptions sparkPipelineOptions, JavaSparkContext javaSparkContext) {
        MetricsAccumulator.init(sparkPipelineOptions, javaSparkContext);
        AggregatorsAccumulator.init(sparkPipelineOptions, javaSparkContext);
    }

    private void detectTranslationMode(Pipeline pipeline) {
        TranslationModeDetector translationModeDetector = new TranslationModeDetector();
        pipeline.traverseTopologically(translationModeDetector);
        if (translationModeDetector.getTranslationMode().equals(TranslationMode.STREAMING)) {
            this.mOptions.setStreaming(true);
        }
    }

    public static void updateCacheCandidates(Pipeline pipeline, SparkPipelineTranslator sparkPipelineTranslator, EvaluationContext evaluationContext) {
        pipeline.traverseTopologically(new CacheVisitor(sparkPipelineTranslator, evaluationContext));
    }
}
