package datahub.spark;

import datahub.client.rest.RestEmitterConfig;
import datahub.spark.conf.DatahubEmitterConfig;
import datahub.spark.conf.RestDatahubEmitterConfig;
import datahub.spark.conf.SparkAppContext;
import datahub.spark.conf.SparkConfigParser;
import datahub.spark.conf.SparkLineageConf;
import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import datahub.spark2.shaded.typesafe.config.Config;
import datahub.spark2.shaded.typesafe.config.ConfigFactory;
import io.openlineage.spark.agent.ArgumentParser;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.streaming.StreamingQueryListener;

/* loaded from: input_file:datahub/spark/DatahubSparkListener.class */
public class DatahubSparkListener extends SparkListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DatahubSparkListener.class);
    private SparkAppContext appContext;
    private final Map<String, Instant> batchLastUpdated = new HashMap();
    private Config datahubConf = ConfigFactory.empty();
    private final OpenLineageSparkListener listener = new OpenLineageSparkListener();
    private final DatahubEventEmitter emitter = new DatahubEventEmitter();

    public DatahubSparkListener() throws URISyntaxException {
        OpenLineageSparkListener.init(new ContextFactory(this.emitter));
    }

    private static SparkAppContext getSparkAppContext(SparkListenerApplicationStart sparkListenerApplicationStart) {
        SparkAppContext sparkAppContext = new SparkAppContext();
        sparkAppContext.setAppName(sparkListenerApplicationStart.appName());
        if (sparkListenerApplicationStart.appAttemptId().isDefined()) {
            sparkAppContext.setAppAttemptId((String) sparkListenerApplicationStart.appAttemptId().get());
        }
        sparkAppContext.setSparkUser(sparkListenerApplicationStart.sparkUser());
        sparkAppContext.setStartTime(Long.valueOf(sparkListenerApplicationStart.time()));
        sparkAppContext.setAppId((String) sparkListenerApplicationStart.appId().get());
        return sparkAppContext;
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Application start called");
        this.appContext = getSparkAppContext(sparkListenerApplicationStart);
        this.listener.onApplicationStart(sparkListenerApplicationStart);
        log.debug("onApplicationStart completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public Optional<DatahubEmitterConfig> initializeEmitter(Config config) {
        String string = config.hasPath(SparkConfigParser.TRANSPORT_KEY) ? config.getString(SparkConfigParser.TRANSPORT_KEY) : "rest";
        if (!string.equals("rest")) {
            log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", string);
            return Optional.empty();
        }
        String string2 = config.hasPath(SparkConfigParser.GMS_URL_KEY) ? config.getString(SparkConfigParser.GMS_URL_KEY) : "http://localhost:8080";
        String string3 = config.hasPath(SparkConfigParser.GMS_AUTH_TOKEN) ? config.getString(SparkConfigParser.GMS_AUTH_TOKEN) : null;
        boolean z = config.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY) && config.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY);
        log.info("REST Emitter Configuration: GMS url {}{}", string2, config.hasPath(SparkConfigParser.GMS_URL_KEY) ? "" : "(default)");
        if (string3 != null) {
            log.info("REST Emitter Configuration: Token {}", "XXXXX");
        }
        if (z) {
            log.warn("REST Emitter Configuration: ssl verification will be disabled.");
        }
        return Optional.of(new RestDatahubEmitterConfig(RestEmitterConfig.builder().server(string2).token(string3).disableSslVerification(z).build()));
    }

    private synchronized void loadDatahubConfig(SparkAppContext sparkAppContext, Properties properties) {
        long currentTimeMillis = System.currentTimeMillis();
        this.datahubConf = SparkConfigParser.parseSparkConfig();
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv != null) {
            log.info("sparkEnv: {}", sparkEnv.conf().toDebugString());
            sparkEnv.conf().set(ArgumentParser.SPARK_CONF_DISABLED_FACETS, "[spark_unknown;spark.logicalPlan]");
        }
        if (properties != null) {
            this.datahubConf = SparkConfigParser.parsePropertiesToConfig(properties);
            this.appContext.setDatabricksTags(SparkConfigParser.getDatabricksTags(this.datahubConf).orElse(null));
        }
        log.info("Datahub configuration: {}", this.datahubConf.root().render());
        this.emitter.setConfig(SparkLineageConf.toSparkLineageConf(this.datahubConf, sparkAppContext, initializeEmitter(this.datahubConf).orElse(null)));
        log.debug("loadDatahubConfig completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Application end called");
        this.listener.onApplicationEnd(sparkListenerApplicationEnd);
        if (this.datahubConf.hasPath(SparkConfigParser.STREAMING_JOB) && this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB)) {
            return;
        }
        this.emitter.emitCoalesced();
        log.debug("onApplicationEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Task end called");
        this.listener.onTaskEnd(sparkListenerTaskEnd);
        log.debug("onTaskEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Job end called");
        this.listener.onJobEnd(sparkListenerJobEnd);
        log.debug("onJobEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Job start called");
        loadDatahubConfig(this.appContext, sparkListenerJobStart.properties());
        this.listener.onJobStart(sparkListenerJobStart);
        log.debug("onJobStart completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Other event called {}", sparkListenerEvent.getClass().getName());
        if (((sparkListenerEvent instanceof StreamingQueryListener.QueryProgressEvent) || (sparkListenerEvent instanceof StreamingQueryListener.QueryStartedEvent)) && !this.emitter.isStreaming()) {
            if (this.datahubConf.hasPath(SparkConfigParser.STREAMING_JOB)) {
                this.emitter.setStreaming(this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB));
                log.info("Streaming mode set to {}", Boolean.valueOf(this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB)));
            } else {
                log.info("Streaming mode not set explicitly, switching to streaming mode");
                this.emitter.setStreaming(true);
            }
        }
        if (this.datahubConf.hasPath(SparkConfigParser.STREAMING_JOB) && !this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB)) {
            log.info("Not in streaming mode");
            return;
        }
        this.listener.onOtherEvent(sparkListenerEvent);
        if (sparkListenerEvent instanceof StreamingQueryListener.QueryProgressEvent) {
            int streamingHeartbeatSec = SparkConfigParser.getStreamingHeartbeatSec(this.datahubConf);
            StreamingQueryListener.QueryProgressEvent queryProgressEvent = (StreamingQueryListener.QueryProgressEvent) sparkListenerEvent;
            ((StreamingQueryListener.QueryProgressEvent) sparkListenerEvent).progress().id();
            if (this.batchLastUpdated.containsKey(queryProgressEvent.progress().id().toString()) && this.batchLastUpdated.get(queryProgressEvent.progress().id().toString()).isAfter(Instant.now().minusSeconds(streamingHeartbeatSec))) {
                log.debug("Skipping lineage emit as it was emitted in the last {} seconds", Integer.valueOf(streamingHeartbeatSec));
                return;
            }
            try {
                this.batchLastUpdated.put(queryProgressEvent.progress().id().toString(), Instant.now());
                this.emitter.emit(queryProgressEvent.progress());
                log.debug("Query progress event: {}", queryProgressEvent.progress());
                log.debug("onOtherEvent completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
