package org.apache.nemo.compiler.frontend.spark.core;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import org.apache.nemo.client.JobLauncher;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.LoopVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.compiler.frontend.spark.SparkBroadcastVariables;
import org.apache.nemo.compiler.frontend.spark.SparkKeyExtractor;
import org.apache.nemo.compiler.frontend.spark.coder.SparkDecoderFactory;
import org.apache.nemo.compiler.frontend.spark.coder.SparkEncoderFactory;
import org.apache.nemo.compiler.frontend.spark.transform.CollectTransform;
import org.apache.nemo.compiler.frontend.spark.transform.GroupByKeyTransform;
import org.apache.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.serializer.Serializer;
import scala.Function1;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.TraversableOnce;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.class */
public final class SparkFrontendUtils {
    private static final KeyExtractorProperty SPARK_KEY_EXTRACTOR_PROP = KeyExtractorProperty.of(new SparkKeyExtractor());

    private SparkFrontendUtils() {
    }

    public static Serializer deriveSerializerFrom(org.apache.spark.SparkContext sparkContext) {
        return sparkContext.conf().get("spark.serializer", "").equals("org.apache.spark.serializer.KryoSerializer") ? new KryoSerializer(sparkContext.conf()) : new JavaSerializer(sparkContext.conf());
    }

    public static <T> List<T> collect(DAG<IRVertex, IREdge> dag, Stack<LoopVertex> stack, IRVertex iRVertex, Serializer serializer) {
        DAGBuilder dAGBuilder = new DAGBuilder(dag);
        OperatorVertex operatorVertex = new OperatorVertex(new CollectTransform());
        dAGBuilder.addVertex(operatorVertex, stack);
        IREdge iREdge = new IREdge(getEdgeCommunicationPattern(iRVertex, operatorVertex), iRVertex, operatorVertex);
        iREdge.setProperty(EncoderProperty.of(new SparkEncoderFactory(serializer)));
        iREdge.setProperty(DecoderProperty.of(new SparkDecoderFactory(serializer)));
        iREdge.setProperty(SPARK_KEY_EXTRACTOR_PROP);
        dAGBuilder.connectVertices(iREdge);
        JobLauncher.launchDAG(dAGBuilder.build(), SparkBroadcastVariables.getAll(), "");
        return JobLauncher.getCollectedData();
    }

    public static CommunicationPatternProperty.Value getEdgeCommunicationPattern(IRVertex iRVertex, IRVertex iRVertex2) {
        return ((iRVertex2 instanceof OperatorVertex) && ((((OperatorVertex) iRVertex2).getTransform() instanceof ReduceByKeyTransform) || (((OperatorVertex) iRVertex2).getTransform() instanceof GroupByKeyTransform))) ? CommunicationPatternProperty.Value.Shuffle : CommunicationPatternProperty.Value.OneToOne;
    }

    public static <I, O> Function<I, O> toJavaFunction(Function1<I, O> function1) {
        final ClassTag apply = ClassTag$.MODULE$.apply(function1.getClass());
        final byte[] array = new JavaSerializer().newInstance().serialize(function1, apply).array();
        return new Function<I, O>() { // from class: org.apache.nemo.compiler.frontend.spark.core.SparkFrontendUtils.1
            private Function1<I, O> deserializedFunction;

            public O call(I i) throws Exception {
                if (this.deserializedFunction == null) {
                    this.deserializedFunction = (Function1) new JavaSerializer().newInstance().deserialize(ByteBuffer.wrap(array), apply);
                }
                return (O) this.deserializedFunction.apply(i);
            }
        };
    }

    public static <I1, I2, O> Function2<I1, I2, O> toJavaFunction(final scala.Function2<I1, I2, O> function2) {
        return new Function2<I1, I2, O>() { // from class: org.apache.nemo.compiler.frontend.spark.core.SparkFrontendUtils.2
            public O call(I1 i1, I2 i2) throws Exception {
                return (O) function2.apply(i1, i2);
            }
        };
    }

    public static <I, O> FlatMapFunction<I, O> toJavaFlatMapFunction(final Function1<I, TraversableOnce<O>> function1) {
        return new FlatMapFunction<I, O>() { // from class: org.apache.nemo.compiler.frontend.spark.core.SparkFrontendUtils.3
            public Iterator<O> call(I i) throws Exception {
                return (Iterator) JavaConverters.asJavaIteratorConverter(((TraversableOnce) function1.apply(i)).toIterator()).asJava();
            }
        };
    }

    public static <T, K, V> Function<T, Tuple2<K, V>> pairFunctionToPlainFunction(final PairFunction<T, K, V> pairFunction) {
        return new Function<T, Tuple2<K, V>>() { // from class: org.apache.nemo.compiler.frontend.spark.core.SparkFrontendUtils.4
            public Tuple2<K, V> call(T t) throws Exception {
                return pairFunction.call(t);
            }

            /* JADX WARN: Multi-variable type inference failed */
            /* renamed from: call, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m4call(Object obj) throws Exception {
                return call((AnonymousClass4<K, T, V>) obj);
            }
        };
    }
}
