package org.streampipes.wrapper.spark;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.SparkConf;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.base.InvocableStreamPipesEntity;
import org.streampipes.wrapper.spark.converter.JsonToMapFormat;

/* loaded from: input_file:org/streampipes/wrapper/spark/SparkRuntime.class */
public abstract class SparkRuntime<I extends InvocableStreamPipesEntity> implements Runnable, Serializable {
    private static final long serialVersionUID = 1;
    protected final SparkDeploymentConfig deploymentConfig;
    protected Thread thread;
    protected SparkAppHandle appHandle;
    protected SparkLauncher launcher;
    protected JavaStreamingContext streamingContext;
    protected I graph;
    protected Map kafkaParams = new HashMap();

    public SparkRuntime(I i, SparkDeploymentConfig sparkDeploymentConfig) {
        this.graph = i;
        this.deploymentConfig = sparkDeploymentConfig;
        this.kafkaParams.put("bootstrap.servers", this.deploymentConfig.getKafkaHost());
        this.kafkaParams.put("key.deserializer", StringDeserializer.class);
        this.kafkaParams.put("value.deserializer", StringDeserializer.class);
        this.kafkaParams.put("key.serializer", StringSerializer.class);
        this.kafkaParams.put("value.serializer", StringSerializer.class);
        this.kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        this.kafkaParams.put("auto.offset.reset", "latest");
        this.kafkaParams.put("enable.auto.commit", false);
    }

    public boolean startExecution() {
        if (!this.deploymentConfig.isRunLocal()) {
            try {
                this.launcher = new SparkLauncher().setAppResource(this.deploymentConfig.getJarFile()).setMainClass(getClass().getName()).addAppArgs(new String[]{Base64.getEncoder().encodeToString(getSerializationData())}).setMaster(this.deploymentConfig.getSparkHost()).setConf("spark.driver.memory", "2g");
                this.appHandle = this.launcher.startApplication(new SparkAppHandle.Listener[0]);
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
        try {
            this.streamingContext = new JavaStreamingContext(new SparkConf().setAppName(this.deploymentConfig.getAppName()).setMaster(this.deploymentConfig.getSparkHost()), new Duration(this.deploymentConfig.getSparkBatchDuration()));
            JavaInputDStream<ConsumerRecord<String, String>> stream1Source = getStream1Source(this.streamingContext);
            if (stream1Source == null) {
                throw new Exception("At least one source must be defined for a Spark SEPA");
            }
            execute(stream1Source.flatMap(new JsonToMapFormat(this.graph)));
            return true;
        } catch (Exception e2) {
            e2.printStackTrace();
            return false;
        }
    }

    protected abstract byte[] getSerializationData();

    public abstract boolean execute(JavaDStream<Map<String, Object>>... javaDStreamArr);

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.streamingContext.start();
            if (this.deploymentConfig.isRunLocal()) {
                this.streamingContext.awaitTermination();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean stop() {
        try {
            this.appHandle.stop();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    private JavaInputDStream<ConsumerRecord<String, String>> getStream1Source(JavaStreamingContext javaStreamingContext) {
        return getStreamSource(0, javaStreamingContext);
    }

    private JavaInputDStream<ConsumerRecord<String, String>> getStream2Source(JavaStreamingContext javaStreamingContext) {
        return getStreamSource(1, javaStreamingContext);
    }

    private JavaInputDStream<ConsumerRecord<String, String>> getStreamSource(int i, JavaStreamingContext javaStreamingContext) {
        SpDataStream spDataStream;
        if (this.graph.getInputStreams().size() - 1 < i || (spDataStream = (SpDataStream) this.graph.getInputStreams().get(i)) == null) {
            return null;
        }
        return KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(spDataStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName()), this.kafkaParams));
    }
}
