package datahub.spark;

import datahub.spark.conf.DatahubConf;
import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import datahub.spark2.shaded.typesafe.config.Config;
import datahub.spark2.shaded.typesafe.config.ConfigFactory;
import datahub.spark2.shaded.typesafe.config.ConfigValueFactory;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.streaming.StreamingQueryListener;

/* loaded from: input_file:datahub/spark/DatahubSparkListener.class */
public class DatahubSparkListener extends SparkListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DatahubSparkListener.class);
    private final Map<String, Instant> batchLastUpdated = new HashMap();
    private Config datahubConf = ConfigFactory.empty();
    private final OpenLineageSparkListener listener = new OpenLineageSparkListener();
    private final DatahubEventEmitter emitter = new DatahubEventEmitter();

    public DatahubSparkListener() throws URISyntaxException {
        OpenLineageSparkListener.init(new ContextFactory(this.emitter));
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Application start called");
        loadDatahubConfig();
        this.listener.onApplicationStart(sparkListenerApplicationStart);
        log.info("onApplicationStart completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private synchronized void loadDatahubConfig() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.datahubConf.isEmpty()) {
            this.datahubConf = DatahubConf.parseSparkConfig();
            SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
            if (sparkEnv != null) {
                log.info("sparkEnv: {}", sparkEnv.conf().toDebugString());
                String replaceAll = sparkEnv.conf().get(DatahubConf.SPARK_APP_NAME).replaceAll("[,\\[\\]]", "");
                String replaceAll2 = sparkEnv.conf().get(DatahubConf.SPARK_MASTER).replaceAll("[,\\[\\]]", "");
                String str = sparkEnv.conf().get(DatahubConf.SPARK_APP_START_TIME);
                this.datahubConf = this.datahubConf.withValue(DatahubConf.SPARK_APP_NAME, ConfigValueFactory.fromAnyRef(replaceAll));
                this.datahubConf = this.datahubConf.withValue(DatahubConf.SPARK_MASTER, ConfigValueFactory.fromAnyRef(replaceAll2));
                this.datahubConf = this.datahubConf.withValue(DatahubConf.SPARK_APP_START_TIME, ConfigValueFactory.fromAnyRef(str));
            }
            this.emitter.setConfig(this.datahubConf);
        }
        log.info("loadDatahubConfig completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Application end called");
        this.listener.onApplicationEnd(sparkListenerApplicationEnd);
        if (this.datahubConf.hasPath(DatahubConf.STREAMING_JOB) && this.datahubConf.getBoolean(DatahubConf.STREAMING_JOB)) {
            return;
        }
        this.emitter.emitCoalesced();
        log.info("onApplicationEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Task end called");
        this.listener.onTaskEnd(sparkListenerTaskEnd);
        log.info("onTaskEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Job end called");
        this.listener.onJobEnd(sparkListenerJobEnd);
        log.info("onJobEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Job start called");
        this.listener.onJobStart(sparkListenerJobStart);
        log.info("onJobStart completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Other event called {}", sparkListenerEvent);
        loadDatahubConfig();
        if (((sparkListenerEvent instanceof StreamingQueryListener.QueryProgressEvent) || (sparkListenerEvent instanceof StreamingQueryListener.QueryStartedEvent)) && !this.emitter.isStreaming()) {
            if (this.datahubConf.hasPath(DatahubConf.STREAMING_JOB)) {
                this.emitter.setStreaming(this.datahubConf.getBoolean(DatahubConf.STREAMING_JOB));
                log.info("Streaming mode set to {}", Boolean.valueOf(this.datahubConf.getBoolean(DatahubConf.STREAMING_JOB)));
            } else {
                log.info("Streaming mode not set explicitly, switching to streaming mode");
                this.emitter.setStreaming(true);
            }
        }
        if (this.datahubConf.hasPath(DatahubConf.STREAMING_JOB) && !this.datahubConf.getBoolean(DatahubConf.STREAMING_JOB)) {
            log.info("Not in streaming mode");
            return;
        }
        this.listener.onOtherEvent(sparkListenerEvent);
        if (sparkListenerEvent instanceof StreamingQueryListener.QueryProgressEvent) {
            int streamingHeartbeatSec = DatahubConf.getStreamingHeartbeatSec(this.datahubConf);
            StreamingQueryListener.QueryProgressEvent queryProgressEvent = (StreamingQueryListener.QueryProgressEvent) sparkListenerEvent;
            ((StreamingQueryListener.QueryProgressEvent) sparkListenerEvent).progress().id();
            if (this.batchLastUpdated.containsKey(queryProgressEvent.progress().id().toString()) && this.batchLastUpdated.get(queryProgressEvent.progress().id().toString()).isAfter(Instant.now().minusSeconds(streamingHeartbeatSec))) {
                log.debug("Skipping lineage emit as it was emitted in the last {} seconds", Integer.valueOf(streamingHeartbeatSec));
                return;
            }
            try {
                this.batchLastUpdated.put(queryProgressEvent.progress().id().toString(), Instant.now());
                this.emitter.emit(queryProgressEvent.progress());
                log.debug("Query progress event: {}", queryProgressEvent.progress());
                log.info("onOtherEvent completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
