package datahub.spark;

import datahub.client.rest.RestEmitterConfig;
import datahub.spark.conf.DatahubEmitterConfig;
import datahub.spark.conf.RestDatahubEmitterConfig;
import datahub.spark.conf.SparkAppContext;
import datahub.spark.conf.SparkConfigParser;
import datahub.spark.conf.SparkLineageConf;
import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import datahub.spark2.shaded.typesafe.config.Config;
import datahub.spark2.shaded.typesafe.config.ConfigFactory;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.openlineage.client.OpenLineageConfig;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.circuitBreaker.NoOpCircuitBreaker;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.spark.agent.ArgumentParser;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.SparkOpenLineageConfig;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.package$;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import scala.Function0;
import scala.Option;

/* loaded from: input_file:datahub/spark/DatahubSparkListener.class */
public class DatahubSparkListener extends SparkListener {
    private final OpenLineageSparkListener listener;
    private DatahubEventEmitter emitter;
    private SparkAppContext appContext;
    private static ContextFactory contextFactory;
    private final Function0<Option<SparkContext>> activeSparkContext;
    private static MeterRegistry meterRegistry;
    private boolean isDisabled;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DatahubSparkListener.class);
    private static CircuitBreaker circuitBreaker = new NoOpCircuitBreaker();
    private static final String sparkVersion = package$.MODULE$.SPARK_VERSION();
    private final Map<String, Instant> batchLastUpdated = new HashMap();
    private Config datahubConf = ConfigFactory.empty();

    public DatahubSparkListener() throws URISyntaxException {
        SparkContext$ sparkContext$ = SparkContext$.MODULE$;
        Objects.requireNonNull(sparkContext$);
        this.activeSparkContext = ScalaConversionUtils.toScalaFn(sparkContext$::getActive);
        this.listener = new OpenLineageSparkListener();
    }

    private static SparkAppContext getSparkAppContext(SparkListenerApplicationStart sparkListenerApplicationStart) {
        SparkAppContext sparkAppContext = new SparkAppContext();
        sparkAppContext.setAppName(sparkListenerApplicationStart.appName());
        if (sparkListenerApplicationStart.appAttemptId().isDefined()) {
            sparkAppContext.setAppAttemptId((String) sparkListenerApplicationStart.appAttemptId().get());
        }
        sparkAppContext.setSparkUser(sparkListenerApplicationStart.sparkUser());
        sparkAppContext.setStartTime(Long.valueOf(sparkListenerApplicationStart.time()));
        sparkAppContext.setAppId((String) sparkListenerApplicationStart.appId().get());
        return sparkAppContext;
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Application start called");
        this.appContext = getSparkAppContext(sparkListenerApplicationStart);
        initializeContextFactoryIfNotInitialized();
        this.listener.onApplicationStart(sparkListenerApplicationStart);
        log.info("onApplicationStart completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public Optional<DatahubEmitterConfig> initializeEmitter(Config config) {
        String string = config.hasPath(SparkConfigParser.TRANSPORT_KEY) ? config.getString(SparkConfigParser.TRANSPORT_KEY) : "rest";
        if (!string.equals("rest")) {
            log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", string);
            return Optional.empty();
        }
        String string2 = config.hasPath(SparkConfigParser.GMS_URL_KEY) ? config.getString(SparkConfigParser.GMS_URL_KEY) : "http://localhost:8080";
        String string3 = config.hasPath(SparkConfigParser.GMS_AUTH_TOKEN) ? config.getString(SparkConfigParser.GMS_AUTH_TOKEN) : null;
        boolean z = config.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY) && config.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY);
        int i = config.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC) ? config.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC) : 5;
        int i2 = config.hasPath(SparkConfigParser.MAX_RETRIES) ? config.getInt(SparkConfigParser.MAX_RETRIES) : 0;
        log.info("REST Emitter Configuration: GMS url {}{}", string2, config.hasPath(SparkConfigParser.GMS_URL_KEY) ? "" : "(default)");
        if (string3 != null) {
            log.info("REST Emitter Configuration: Token {}", "XXXXX");
        }
        if (z) {
            log.warn("REST Emitter Configuration: ssl verification will be disabled.");
        }
        return Optional.of(new RestDatahubEmitterConfig(RestEmitterConfig.builder().server(string2).token(string3).disableSslVerification(z).maxRetries(i2).retryIntervalSec(i).build()));
    }

    private synchronized SparkLineageConf loadDatahubConfig(SparkAppContext sparkAppContext, Properties properties) {
        long currentTimeMillis = System.currentTimeMillis();
        this.datahubConf = SparkConfigParser.parseSparkConfig();
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv != null) {
            log.info("sparkEnv: {}", sparkEnv.conf().toDebugString());
            sparkEnv.conf().set("spark.openlineage.facets.disabled", "[spark_unknown;spark.logicalPlan]");
        }
        if (properties != null) {
            this.datahubConf = SparkConfigParser.parsePropertiesToConfig(properties);
            this.appContext.setDatabricksTags(SparkConfigParser.getDatabricksTags(this.datahubConf).orElse(null));
        }
        log.info("Datahub configuration: {}", this.datahubConf.root().render());
        SparkLineageConf sparkLineageConf = SparkLineageConf.toSparkLineageConf(this.datahubConf, sparkAppContext, initializeEmitter(this.datahubConf).orElse(null));
        log.debug("loadDatahubConfig completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return sparkLineageConf;
    }

    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Application end called");
        this.listener.onApplicationEnd(sparkListenerApplicationEnd);
        if (this.datahubConf.hasPath(SparkConfigParser.STREAMING_JOB) && this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB)) {
            return;
        }
        if (this.emitter != null) {
            this.emitter.emitCoalesced();
        } else {
            log.warn("Emitter is not initialized, unable to emit coalesced events");
        }
        log.debug("onApplicationEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Task end called");
        this.listener.onTaskEnd(sparkListenerTaskEnd);
        log.debug("onTaskEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Job end called");
        this.listener.onJobEnd(sparkListenerJobEnd);
        log.debug("onJobEnd completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        long currentTimeMillis = System.currentTimeMillis();
        initializeContextFactoryIfNotInitialized();
        log.debug("Job start called");
        this.listener.onJobStart(sparkListenerJobStart);
        log.debug("onJobStart completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        long currentTimeMillis = System.currentTimeMillis();
        log.debug("Other event called {}", sparkListenerEvent.getClass().getName());
        if (((sparkListenerEvent instanceof StreamingQueryListener.QueryProgressEvent) || (sparkListenerEvent instanceof StreamingQueryListener.QueryStartedEvent)) && !this.emitter.isStreaming()) {
            if (this.datahubConf.hasPath(SparkConfigParser.STREAMING_JOB)) {
                this.emitter.setStreaming(this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB));
                log.info("Streaming mode set to {}", Boolean.valueOf(this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB)));
            } else {
                log.info("Streaming mode not set explicitly, switching to streaming mode");
                this.emitter.setStreaming(true);
            }
        }
        if (this.datahubConf.hasPath(SparkConfigParser.STREAMING_JOB) && !this.datahubConf.getBoolean(SparkConfigParser.STREAMING_JOB)) {
            log.info("Not in streaming mode");
            return;
        }
        this.listener.onOtherEvent(sparkListenerEvent);
        if (sparkListenerEvent instanceof StreamingQueryListener.QueryProgressEvent) {
            int streamingHeartbeatSec = SparkConfigParser.getStreamingHeartbeatSec(this.datahubConf);
            StreamingQueryListener.QueryProgressEvent queryProgressEvent = (StreamingQueryListener.QueryProgressEvent) sparkListenerEvent;
            ((StreamingQueryListener.QueryProgressEvent) sparkListenerEvent).progress().id();
            if (this.batchLastUpdated.containsKey(queryProgressEvent.progress().id().toString()) && this.batchLastUpdated.get(queryProgressEvent.progress().id().toString()).isAfter(Instant.now().minusSeconds(streamingHeartbeatSec))) {
                log.debug("Skipping lineage emit as it was emitted in the last {} seconds", Integer.valueOf(streamingHeartbeatSec));
                return;
            }
            try {
                this.batchLastUpdated.put(queryProgressEvent.progress().id().toString(), Instant.now());
                this.emitter.emit(queryProgressEvent.progress());
                log.debug("Query progress event: {}", queryProgressEvent.progress());
                log.debug("onOtherEvent completed successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
        meterRegistry = MicrometerProvider.addMeterRegistryFromConfig(openLineageConfig.getMetricsConfig());
        String join = (openLineageConfig.getFacetsConfig() == null || openLineageConfig.getFacetsConfig().getDisabledFacets() == null) ? "" : String.join(ArgumentParser.DISABLED_FACETS_SEPARATOR, openLineageConfig.getFacetsConfig().getDisabledFacets());
        meterRegistry.config().commonTags(Tags.of(Tag.of("openlineage.spark.integration.version", Versions.getVersion()), Tag.of("openlineage.spark.version", sparkVersion), Tag.of("openlineage.spark.disabled.facets", join)));
        String str = join;
        ((CompositeMeterRegistry) meterRegistry).getRegistries().forEach(meterRegistry2 -> {
            meterRegistry2.config().commonTags(Tags.of(Tag.of("openlineage.spark.integration.version", Versions.getVersion()), Tag.of("openlineage.spark.version", sparkVersion), Tag.of("openlineage.spark.disabled.facets", str)));
        });
    }

    private void initializeContextFactoryIfNotInitialized() {
        if (contextFactory != null || this.isDisabled) {
            return;
        }
        ScalaConversionUtils.asJavaOptional(this.activeSparkContext.apply()).ifPresent(sparkContext -> {
            initializeContextFactoryIfNotInitialized(sparkContext.appName());
        });
    }

    private void initializeContextFactoryIfNotInitialized(String str) {
        if (contextFactory != null || this.isDisabled) {
            return;
        }
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv == null) {
            log.warn("OpenLineage listener instantiated, but no configuration could be found. Lineage events will not be collected");
        } else {
            initializeContextFactoryIfNotInitialized(sparkEnv.conf(), str);
        }
    }

    private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, String str) {
        if (contextFactory != null || this.isDisabled) {
            return;
        }
        try {
            SparkLineageConf loadDatahubConfig = loadDatahubConfig(this.appContext, null);
            SparkOpenLineageConfig parse = ArgumentParser.parse(sparkConf);
            initializeMetrics(parse);
            this.emitter = new DatahubEventEmitter(parse, str);
            this.emitter.setConfig(loadDatahubConfig);
            contextFactory = new ContextFactory(this.emitter, meterRegistry, parse);
            circuitBreaker = new CircuitBreakerFactory(parse.getCircuitBreaker()).build();
            OpenLineageSparkListener.init(contextFactory);
        } catch (URISyntaxException e) {
            log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", (Throwable) e);
        }
    }
}
