package datahub.spark;

import com.linkedin.common.DataJobUrnArray;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.TimeStamp;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.datajob.DataFlowInfo;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.schema.SchemaMetadata;
import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import datahub.client.rest.RestEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.conf.DatahubConf;
import datahub.spark.conf.SparkAppContext;
import datahub.spark.converter.OpenLineageToDataHub;
import datahub.spark.converter.SparkStreamingEventToDatahub;
import datahub.spark.utils.DatahubUtils;
import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import datahub.spark2.shaded.typesafe.config.Config;
import datahub.spark2.shaded.typesafe.config.ConfigFactory;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientException;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.ArgumentParser;
import io.openlineage.spark.agent.EventEmitter;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.streaming.StreamingQueryProgress;

/* loaded from: input_file:datahub/spark/DatahubEventEmitter.class */
public class DatahubEventEmitter extends EventEmitter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DatahubEventEmitter.class);
    private final AtomicBoolean streaming;
    Set<DatasetUrn> inSet;
    Set<DatasetUrn> outSet;
    Map<String, MetadataChangeProposalWrapper> schemaMap;
    OpenLineage.RunEvent firstRunEvent;
    private String emitterType;
    private Optional<RestEmitterConfig> restEmitterConfig;
    private final SparkAppContext appContext;
    private Config datahubConf;

    public DatahubEventEmitter() throws URISyntaxException {
        super(ArgumentParser.parse(new SparkConf()));
        this.streaming = new AtomicBoolean(false);
        this.inSet = new TreeSet(new DataSetUrnComparator());
        this.outSet = new TreeSet(new DataSetUrnComparator());
        this.schemaMap = new HashMap();
        this.appContext = new SparkAppContext();
        this.datahubConf = ConfigFactory.empty();
    }

    private Optional<Emitter> getEmitter() {
        Optional<Emitter> empty = Optional.empty();
        if (!this.emitterType.equals("rest")) {
            log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", this.emitterType);
        } else if (this.restEmitterConfig.isPresent()) {
            empty = Optional.of(new RestEmitter(this.restEmitterConfig.get()));
        }
        return empty;
    }

    public List<MetadataChangeProposalWrapper> convertOpenLineageRunEventToMcp(OpenLineage.RunEvent runEvent) {
        ArrayList arrayList = new ArrayList();
        try {
            log.debug("Emitting lineage: {}", OpenLineageClientUtils.toJson(runEvent));
        } catch (OpenLineageClientException e) {
            log.error("Could not emit lineage w/ exception", (Throwable) e);
        }
        if (runEvent.getEventType() == OpenLineage.RunEvent.EventType.START && this.firstRunEvent == null) {
            this.firstRunEvent = runEvent;
            return arrayList;
        }
        if (runEvent.getEventType() == OpenLineage.RunEvent.EventType.COMPLETE || isStreaming()) {
            OpenLineage.Job job = runEvent.getJob();
            log.debug("Job: {}", job.getNamespace() + "." + job.getName());
            DataFlowInfo dataFlowInfo = getDataFlowInfo(runEvent, this.datahubConf);
            DataFlowUrn flowUrn = DatahubUtils.flowUrn(DatahubConf.getSparkPlatformInstanceKey(this.datahubConf), DatahubConf.getPipelineName(this.datahubConf));
            arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder -> {
                entityTypeStepBuilder.entityType("dataFlow").entityUrn(flowUrn).upsert().aspect(dataFlowInfo);
            }));
            DataJobInfo dataJobInfo = new DataJobInfo();
            String name = job.getName();
            dataJobInfo.setName(name);
            dataJobInfo.setType(DataJobInfo.Type.create("SPARK"));
            DataJobUrn jobUrn = DatahubUtils.jobUrn(flowUrn, name);
            arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder2 -> {
                entityTypeStepBuilder2.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(dataJobInfo);
            }));
            DataJobInputOutput dataJobInputOutput = new DataJobInputOutput();
            if (runEvent.getInputs() != null) {
                log.debug("Processing Inputs {}", OpenLineageClientUtils.toJson(runEvent.getInputs()));
                DatasetUrnArray datasetUrnArray = new DatasetUrnArray();
                for (OpenLineage.InputDataset inputDataset : runEvent.getInputs()) {
                    log.debug("Processing Input Dataset: Namespace:{}, datasetName: {}, Env: {}", inputDataset.getNamespace(), inputDataset.getName(), DatahubConf.getCommonFabricType(this.datahubConf));
                    Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn = OpenLineageToDataHub.convertOpenlineageDatasetToDatasetUrn(inputDataset, this.datahubConf);
                    if (convertOpenlineageDatasetToDatasetUrn.isPresent()) {
                        this.inSet.add(convertOpenlineageDatasetToDatasetUrn.get());
                        if (!datasetUrnArray.contains(convertOpenlineageDatasetToDatasetUrn.get())) {
                            datasetUrnArray.add(convertOpenlineageDatasetToDatasetUrn.get());
                            log.info("Input Urn: {}", convertOpenlineageDatasetToDatasetUrn.get());
                            if (DatahubConf.isDatasetMaterialize(this.datahubConf)) {
                                arrayList.add(DatahubUtils.generateDatasetMcp(convertOpenlineageDatasetToDatasetUrn.get()));
                                if (DatahubConf.isIncludeSchemaMetadata(this.datahubConf)) {
                                    SchemaMetadata schemaMetadata = OpenLineageToDataHub.getSchemaMetadata(inputDataset);
                                    MetadataChangeProposalWrapper create = MetadataChangeProposalWrapper.create(entityTypeStepBuilder3 -> {
                                        entityTypeStepBuilder3.entityType("dataset").entityUrn((Urn) convertOpenlineageDatasetToDatasetUrn.get()).upsert().aspect(schemaMetadata);
                                    });
                                    arrayList.add(create);
                                    log.debug("Adding to Urn {} to SchemaMap", convertOpenlineageDatasetToDatasetUrn.get());
                                    this.schemaMap.put(convertOpenlineageDatasetToDatasetUrn.get().toString(), create);
                                }
                            }
                        }
                    }
                }
                DatasetUrnArray datasetUrnArray2 = new DatasetUrnArray();
                if (runEvent.getOutputs() != null) {
                    log.debug("Processing Outputs {}", OpenLineageClientUtils.toJson(runEvent.getOutputs()));
                    for (OpenLineage.OutputDataset outputDataset : runEvent.getOutputs()) {
                        log.debug("Processing Output Dataset: Namespace:{}, datasetName: {}, Env: {}", outputDataset.getNamespace(), outputDataset.getName(), DatahubConf.getCommonFabricType(this.datahubConf));
                        Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn2 = OpenLineageToDataHub.convertOpenlineageDatasetToDatasetUrn(outputDataset, this.datahubConf);
                        if (convertOpenlineageDatasetToDatasetUrn2.isPresent()) {
                            this.outSet.add(convertOpenlineageDatasetToDatasetUrn2.get());
                            if (!datasetUrnArray2.contains(convertOpenlineageDatasetToDatasetUrn2.get())) {
                                datasetUrnArray2.add(convertOpenlineageDatasetToDatasetUrn2.get());
                                log.info("Output Urn: {}", convertOpenlineageDatasetToDatasetUrn2.get());
                                if (DatahubConf.isDatasetMaterialize(this.datahubConf)) {
                                    arrayList.add(DatahubUtils.generateDatasetMcp(convertOpenlineageDatasetToDatasetUrn2.get()));
                                    if (DatahubConf.isIncludeSchemaMetadata(this.datahubConf)) {
                                        SchemaMetadata schemaMetadata2 = OpenLineageToDataHub.getSchemaMetadata(outputDataset);
                                        MetadataChangeProposalWrapper create2 = MetadataChangeProposalWrapper.create(entityTypeStepBuilder4 -> {
                                            entityTypeStepBuilder4.entityType("dataset").entityUrn((Urn) convertOpenlineageDatasetToDatasetUrn2.get()).upsert().aspect(schemaMetadata2);
                                        });
                                        arrayList.add(create2);
                                        log.debug("Adding to Urn {} to SchemaMap", convertOpenlineageDatasetToDatasetUrn2.get());
                                        this.schemaMap.put(convertOpenlineageDatasetToDatasetUrn2.get().toString(), create2);
                                    }
                                }
                            }
                        }
                    }
                }
                dataJobInputOutput.setInputDatasets(datasetUrnArray);
                dataJobInputOutput.setOutputDatasets(datasetUrnArray2);
                arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder5 -> {
                    entityTypeStepBuilder5.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(dataJobInputOutput);
                }));
            }
        }
        return arrayList;
    }

    @Override // io.openlineage.spark.agent.EventEmitter
    public void emit(OpenLineage.RunEvent runEvent) {
        long currentTimeMillis = System.currentTimeMillis();
        List<MetadataChangeProposalWrapper> convertOpenLineageRunEventToMcp = convertOpenLineageRunEventToMcp(runEvent);
        if (isStreaming()) {
            log.info("Streaming mode is enabled. Skipping lineage emission.");
            return;
        }
        if (!DatahubConf.isCoalesceEnabled(this.datahubConf)) {
            log.debug("Emitting lineage completed successfully: {}", OpenLineageClientUtils.toJson(runEvent));
            emitMcps(convertOpenLineageRunEventToMcp);
        }
        if (DatahubConf.isCoalesceEnabled(this.datahubConf) && DatahubConf.isEmitCoalescePeriodically(this.datahubConf)) {
            log.debug("Emitting lineage completed successfully: {}", OpenLineageClientUtils.toJson(runEvent));
            emitCoalesced();
        }
        log.info("Collecting lineage successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public void emitCoalesced() {
        long currentTimeMillis = System.currentTimeMillis();
        if (isStreaming()) {
            log.info("Streaming mode is enabled. Skipping lineage emission.");
            return;
        }
        if (DatahubConf.isCoalesceEnabled(this.datahubConf)) {
            List<MetadataChangeProposalWrapper> generateCoalescedMcps = generateCoalescedMcps();
            log.info("Emitting Coalesced lineage completed successfully");
            emitMcps(generateCoalescedMcps);
        }
        log.info("Emitting coalesced lineage completed in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public List<MetadataChangeProposalWrapper> generateCoalescedMcps() {
        ArrayList arrayList = new ArrayList();
        OpenLineage.RunEvent runEvent = this.firstRunEvent;
        if (runEvent == null) {
            log.warn("Failed to get initial run event and no lineage events to emit. Maybe the spark job finished premaraturely?");
            return arrayList;
        }
        OpenLineage.Job job = runEvent.getJob();
        log.debug("Job: {}", job.getNamespace() + "." + job.getName());
        DataFlowInfo dataFlowInfo = getDataFlowInfo(runEvent, this.datahubConf);
        dataFlowInfo.getCustomProperties().put("finishedAt", ZonedDateTime.now(ZoneOffset.UTC).toString());
        DataFlowUrn flowUrn = DatahubUtils.flowUrn(DatahubConf.getSparkPlatformInstanceKey(this.datahubConf), DatahubConf.getPipelineName(this.datahubConf));
        arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder -> {
            entityTypeStepBuilder.entityType("dataFlow").entityUrn(flowUrn).upsert().aspect(dataFlowInfo);
        }));
        DataJobInfo dataJobInfo = new DataJobInfo();
        dataJobInfo.setName(flowUrn.getFlowIdEntity());
        dataJobInfo.setType(DataJobInfo.Type.create("SPARK"));
        dataJobInfo.setCustomProperties(dataFlowInfo.getCustomProperties());
        DataJobUrn jobUrn = DatahubUtils.jobUrn(flowUrn, dataJobInfo.getName());
        arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder2 -> {
            entityTypeStepBuilder2.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(dataJobInfo);
        }));
        DataJobInputOutput dataJobInputOutput = new DataJobInputOutput();
        DataJobUrnArray dataJobUrnArray = new DataJobUrnArray();
        try {
            if (this.datahubConf.hasPath(DatahubConf.PARENT_JOB_KEY)) {
                dataJobUrnArray = new DataJobUrnArray(DataJobUrn.createFromString(this.datahubConf.getString(DatahubConf.PARENT_JOB_KEY)), new DataJobUrn[0]);
            }
        } catch (ClassCastException e) {
            log.warn("parent.datajob_urn is not a valid Datajob URN. Skipping setting up upstream job.");
        } catch (URISyntaxException e2) {
            log.warn("parent.datajob_urn is not a valid URN. Skipping setting up upstream job.");
        }
        dataJobInputOutput.setInputDatajobs(dataJobUrnArray);
        DatasetUrnArray datasetUrnArray = new DatasetUrnArray();
        for (DatasetUrn datasetUrn : this.inSet) {
            datasetUrnArray.add(datasetUrn);
            if (DatahubConf.isDatasetMaterialize(this.datahubConf)) {
                arrayList.add(DatahubUtils.generateDatasetMcp(datasetUrn));
                if (DatahubConf.isIncludeSchemaMetadata(this.datahubConf) && this.schemaMap.containsKey(datasetUrn.toString())) {
                    arrayList.add(this.schemaMap.get(datasetUrn.toString()));
                }
            }
        }
        DatasetUrnArray datasetUrnArray2 = new DatasetUrnArray();
        for (DatasetUrn datasetUrn2 : this.outSet) {
            datasetUrnArray2.add(datasetUrn2);
            if (DatahubConf.isDatasetMaterialize(this.datahubConf)) {
                arrayList.add(DatahubUtils.generateDatasetMcp(datasetUrn2));
                if (DatahubConf.isIncludeSchemaMetadata(this.datahubConf) && this.schemaMap.containsKey(datasetUrn2.toString())) {
                    arrayList.add(this.schemaMap.get(datasetUrn2.toString()));
                }
            }
        }
        dataJobInputOutput.setInputDatasets(datasetUrnArray);
        dataJobInputOutput.setOutputDatasets(datasetUrnArray2);
        arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder3 -> {
            entityTypeStepBuilder3.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(dataJobInputOutput);
        }));
        return arrayList;
    }

    private DataFlowInfo getDataFlowInfo(OpenLineage.RunEvent runEvent, Config config) {
        DataFlowInfo convertRunEventToDataFlowInfo = OpenLineageToDataHub.convertRunEventToDataFlowInfo(runEvent, DatahubConf.getPipelineName(config));
        convertRunEventToDataFlowInfo.getCustomProperties().put("createdAt", this.firstRunEvent.getEventTime().toInstant().toString());
        convertRunEventToDataFlowInfo.getCustomProperties().put("sparkAppId", this.appContext.getAppId());
        convertRunEventToDataFlowInfo.getCustomProperties().put("sparkUser", this.appContext.getSparkUser());
        if ((DatahubConf.getAppStartTime(config) != null ? DatahubConf.getAppStartTime(config) : this.firstRunEvent.getEventTime()) != null) {
            convertRunEventToDataFlowInfo.setCreated(new TimeStamp().setTime(this.firstRunEvent.getEventTime().toInstant().toEpochMilli()));
        }
        return convertRunEventToDataFlowInfo;
    }

    public void emit(StreamingQueryProgress streamingQueryProgress) throws URISyntaxException {
        emitMcps(SparkStreamingEventToDatahub.generateMcpFromStreamingProgressEvent(streamingQueryProgress, this.datahubConf, this.schemaMap));
    }

    protected void emitMcps(List<MetadataChangeProposalWrapper> list) {
        Optional<Emitter> emitter = getEmitter();
        if (emitter.isPresent()) {
            ((List) list.stream().map(metadataChangeProposalWrapper -> {
                try {
                    log.info("emitting mcpw: " + metadataChangeProposalWrapper);
                    return ((Emitter) emitter.get()).emit(metadataChangeProposalWrapper);
                } catch (IOException e) {
                    log.error("Failed to emit metadata to DataHub", (Throwable) e);
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList())).forEach(future -> {
                try {
                    log.info(((MetadataWriteResponse) future.get()).toString());
                } catch (InterruptedException | ExecutionException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                }
            });
            try {
                emitter.get().close();
            } catch (IOException e) {
                log.error("Issue while closing emitter" + e);
            }
        }
    }

    public void setConfig(Config config) {
        this.datahubConf = config;
        initializeEmitter();
    }

    public void initializeEmitter() {
        this.emitterType = this.datahubConf.hasPath(DatahubConf.TRANSPORT_KEY) ? this.datahubConf.getString(DatahubConf.TRANSPORT_KEY) : "rest";
        if (!this.emitterType.equals("rest")) {
            log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", this.emitterType);
            return;
        }
        String string = this.datahubConf.hasPath(DatahubConf.GMS_URL_KEY) ? this.datahubConf.getString(DatahubConf.GMS_URL_KEY) : "http://localhost:8080";
        String string2 = this.datahubConf.hasPath(DatahubConf.GMS_AUTH_TOKEN) ? this.datahubConf.getString(DatahubConf.GMS_AUTH_TOKEN) : null;
        boolean z = this.datahubConf.hasPath(DatahubConf.DISABLE_SSL_VERIFICATION_KEY) && this.datahubConf.getBoolean(DatahubConf.DISABLE_SSL_VERIFICATION_KEY);
        log.info("REST Emitter Configuration: GMS url {}{}", string, this.datahubConf.hasPath(DatahubConf.GMS_URL_KEY) ? "" : "(default)");
        if (string2 != null) {
            log.info("REST Emitter Configuration: Token {}", "XXXXX");
        }
        if (z) {
            log.warn("REST Emitter Configuration: ssl verification will be disabled.");
        }
        this.restEmitterConfig = Optional.of(RestEmitterConfig.builder().server(string).token(string2).disableSslVerification(z).build());
    }

    public boolean isStreaming() {
        return this.streaming.get();
    }

    public void setStreaming(boolean z) {
        this.streaming.set(z);
    }

    public SparkAppContext getAppContext() {
        return this.appContext;
    }
}
