package datahub.spark.consumer.impl;

import com.linkedin.common.DataJobUrnArray;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.datajob.JobStatus;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.model.AppEndEvent;
import datahub.spark.model.AppStartEvent;
import datahub.spark.model.LineageEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark2.shaded.typesafe.config.Config;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:datahub/spark/consumer/impl/CoalesceJobsEmitter.class */
public class CoalesceJobsEmitter extends McpEmitter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CoalesceJobsEmitter.class);
    private static final String PARENT_JOB_KEY = "parent.datajob_urn";
    private final String parentJobUrn;
    private AppStartEvent appStartEvent;
    private final ArrayList<SQLQueryExecStartEvent> sqlQueryExecStartEvents;

    public CoalesceJobsEmitter(Config config) {
        super(config);
        this.appStartEvent = null;
        this.sqlQueryExecStartEvents = new ArrayList<>();
        this.parentJobUrn = config.hasPath(PARENT_JOB_KEY) ? config.getString(PARENT_JOB_KEY) : null;
        log.info("CoalesceJobsEmitter initialised with parent.datajob_urn:" + this.parentJobUrn);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // datahub.spark.consumer.impl.McpEmitter, java.util.function.Consumer
    public void accept(LineageEvent lineageEvent) {
        if (lineageEvent instanceof AppStartEvent) {
            this.appStartEvent = (AppStartEvent) lineageEvent;
            log.debug("AppstartEvent received for processing: " + this.appStartEvent.getAppId());
            emit(this.appStartEvent.asMetadataEvents());
        } else if (lineageEvent instanceof SQLQueryExecStartEvent) {
            SQLQueryExecStartEvent sQLQueryExecStartEvent = (SQLQueryExecStartEvent) lineageEvent;
            this.sqlQueryExecStartEvents.add(sQLQueryExecStartEvent);
            log.debug("SQLQueryExecStartEvent received for processing. for app: " + sQLQueryExecStartEvent.getAppId() + ":" + sQLQueryExecStartEvent.getAppName() + "sqlID: " + sQLQueryExecStartEvent.getSqlQueryExecId());
        } else if (lineageEvent instanceof AppEndEvent) {
            AppEndEvent appEndEvent = (AppEndEvent) lineageEvent;
            if (this.appStartEvent == null) {
                log.error("Application End event received for processing but start event is not received for processing for " + appEndEvent.getAppId() + "-" + appEndEvent.getAppName());
                return;
            }
            log.debug("AppEndEvent received for processing. for app start :" + appEndEvent.getAppId());
            emit(appEndEvent.asMetadataEvents());
            emit(squashSQLQueryExecStartEvents(appEndEvent));
        }
    }

    private List<MetadataChangeProposalWrapper> squashSQLQueryExecStartEvents(AppEndEvent appEndEvent) {
        DataJobUrn dataJobUrn = new DataJobUrn(this.appStartEvent.getFlowUrn(), this.appStartEvent.getAppName());
        TreeSet treeSet = new TreeSet(new DataSetUrnComparator());
        this.sqlQueryExecStartEvents.forEach(sQLQueryExecStartEvent -> {
            treeSet.addAll(sQLQueryExecStartEvent.getInputDatasets());
        });
        TreeSet treeSet2 = new TreeSet(new DataSetUrnComparator());
        this.sqlQueryExecStartEvents.forEach(sQLQueryExecStartEvent2 -> {
            treeSet2.addAll(sQLQueryExecStartEvent2.getOuputDatasets());
        });
        DataJobUrnArray dataJobUrnArray = new DataJobUrnArray();
        try {
            if (this.parentJobUrn != null) {
                dataJobUrnArray = new DataJobUrnArray(DataJobUrn.createFromString(this.parentJobUrn), new DataJobUrn[0]);
            }
        } catch (ClassCastException e) {
            log.warn("parent.datajob_urn is not a valid Datajob URN. Skipping setting up upstream job.");
        } catch (URISyntaxException e2) {
            log.warn("parent.datajob_urn is not a valid URN. Skipping setting up upstream job.");
        }
        DataJobInputOutput inputDatajobs = new DataJobInputOutput().setInputDatasets(new DatasetUrnArray(treeSet)).setOutputDatasets(new DatasetUrnArray(treeSet2)).setInputDatajobs(dataJobUrnArray);
        MetadataChangeProposalWrapper create = MetadataChangeProposalWrapper.create(entityTypeStepBuilder -> {
            entityTypeStepBuilder.entityType("dataJob").entityUrn(dataJobUrn).upsert().aspect(inputDatajobs);
        });
        StringMap stringMap = new StringMap();
        stringMap.put("startedAt", this.appStartEvent.timeStr());
        stringMap.put("appId", this.appStartEvent.getAppId());
        stringMap.put("appName", this.appStartEvent.getAppName());
        stringMap.put("completedAt", appEndEvent.timeStr());
        DataJobInfo type = new DataJobInfo().setName(this.appStartEvent.getAppName()).setType(DataJobInfo.Type.create("sparkJob"));
        type.setCustomProperties(stringMap);
        type.setStatus(JobStatus.COMPLETED);
        return Arrays.asList(create, MetadataChangeProposalWrapper.create(entityTypeStepBuilder2 -> {
            entityTypeStepBuilder2.entityType("dataJob").entityUrn(dataJobUrn).upsert().aspect(type);
        }));
    }

    @Override // datahub.spark.consumer.impl.McpEmitter, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
    }
}
