package datahub.spark.converter;

import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.TimeStamp;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datajob.DataFlowInfo;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.datajob.DataJobInputOutput;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.conf.DatahubConf;
import datahub.spark.dataset.HdfsPathDataset;
import datahub.spark.utils.DatahubUtils;
import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import datahub.spark2.shaded.typesafe.config.Config;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Generated;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.streaming.StreamingQueryProgress;

/* loaded from: input_file:datahub/spark/converter/SparkStreamingEventToDatahub.class */
public class SparkStreamingEventToDatahub {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SparkStreamingEventToDatahub.class);
    public static final String DELTA_LAKE_PLATFORM = "delta-lake";
    public static final String FILE_PLATFORM = "file";
    public static final String KAFKA_PLATFORM = "kafka";

    private SparkStreamingEventToDatahub() {
    }

    public static List<MetadataChangeProposalWrapper> generateMcpFromStreamingProgressEvent(StreamingQueryProgress streamingQueryProgress, Config config, Map<String, MetadataChangeProposalWrapper> map) {
        ArrayList arrayList = new ArrayList();
        DataFlowInfo dataFlowInfo = new DataFlowInfo();
        dataFlowInfo.setName(DatahubConf.getPipelineName(config));
        StringMap stringMap = new StringMap();
        ZonedDateTime appStartTime = DatahubConf.getAppStartTime(config);
        if (appStartTime != null) {
            stringMap.put("createdAt", appStartTime.toString());
            stringMap.put("id", streamingQueryProgress.id().toString());
            dataFlowInfo.setCreated(new TimeStamp().setTime(appStartTime.toInstant().toEpochMilli()));
        }
        stringMap.put("plan", streamingQueryProgress.json());
        dataFlowInfo.setCustomProperties(stringMap);
        DataFlowUrn flowUrn = DatahubUtils.flowUrn(DatahubConf.getSparkMaster(config), DatahubConf.getPipelineName(config));
        arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder -> {
            entityTypeStepBuilder.entityType("dataFlow").entityUrn(flowUrn).upsert().aspect(dataFlowInfo);
        }));
        DataJobInfo dataJobInfo = new DataJobInfo();
        dataJobInfo.setName(DatahubConf.getPipelineName(config));
        dataJobInfo.setType(DataJobInfo.Type.create("SPARK"));
        StringMap stringMap2 = new StringMap();
        stringMap2.put("batchId", Long.toString(streamingQueryProgress.batchId()));
        stringMap2.put("inputRowsPerSecond", Double.toString(streamingQueryProgress.inputRowsPerSecond()));
        stringMap2.put("processedRowsPerSecond", Double.toString(streamingQueryProgress.processedRowsPerSecond()));
        stringMap2.put("numInputRows", Long.toString(streamingQueryProgress.numInputRows()));
        DataJobUrn jobUrn = DatahubUtils.jobUrn(flowUrn, DatahubConf.getPipelineName(config));
        arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder2 -> {
            entityTypeStepBuilder2.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(dataJobInfo);
        }));
        DataJobInputOutput dataJobInputOutput = new DataJobInputOutput();
        JsonElement parse = new JsonParser().parse(streamingQueryProgress.json());
        DatasetUrnArray datasetUrnArray = new DatasetUrnArray();
        Iterator it = parse.getAsJsonObject().get("sources").getAsJsonArray().iterator();
        while (it.hasNext()) {
            Optional<DatasetUrn> generateUrnFromStreamingDescription = generateUrnFromStreamingDescription(((JsonElement) it.next()).getAsJsonObject().get("description").getAsString(), config);
            if (generateUrnFromStreamingDescription.isPresent()) {
                if (datasetUrnArray.contains(generateUrnFromStreamingDescription.get())) {
                    log.debug("We already have dataset {} in the list, skipping it.", generateUrnFromStreamingDescription.get());
                } else {
                    datasetUrnArray.add(generateUrnFromStreamingDescription.get());
                    if (DatahubConf.isDatasetMaterialize(config)) {
                        arrayList.add(DatahubUtils.generateDatasetMcp(generateUrnFromStreamingDescription.get()));
                        if (DatahubConf.isIncludeSchemaMetadata(config) && map.containsKey(generateUrnFromStreamingDescription.get().toString())) {
                            arrayList.add(map.get(generateUrnFromStreamingDescription.get().toString()));
                        }
                    }
                }
            }
        }
        DatasetUrnArray datasetUrnArray2 = new DatasetUrnArray();
        Optional<DatasetUrn> generateUrnFromStreamingDescription2 = generateUrnFromStreamingDescription(parse.getAsJsonObject().get("sink").getAsJsonObject().get("description").getAsString(), config);
        if (generateUrnFromStreamingDescription2.isPresent()) {
            MetadataChangeProposalWrapper generateDatasetMcp = DatahubUtils.generateDatasetMcp(generateUrnFromStreamingDescription2.get());
            datasetUrnArray2.add(generateUrnFromStreamingDescription2.get());
            arrayList.add(generateDatasetMcp);
            if (DatahubConf.isIncludeSchemaMetadata(config) && map.containsKey(generateUrnFromStreamingDescription2.get().toString())) {
                arrayList.add(map.get(generateUrnFromStreamingDescription2.get().toString()));
            }
        }
        dataJobInputOutput.setInputDatasets(datasetUrnArray);
        dataJobInputOutput.setOutputDatasets(datasetUrnArray2);
        arrayList.add(MetadataChangeProposalWrapper.create(entityTypeStepBuilder3 -> {
            entityTypeStepBuilder3.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(dataJobInputOutput);
        }));
        return arrayList;
    }

    public static Optional<DatasetUrn> generateUrnFromStreamingDescription(String str, Config config) {
        Matcher matcher = Pattern.compile("(.*?)\\[(.*)]").matcher(str);
        if (!matcher.find()) {
            return Optional.empty();
        }
        String datahubPlatform = getDatahubPlatform(matcher.group(1));
        String group = matcher.group(2);
        log.debug("Streaming description Platform: {}, Path: {}", datahubPlatform, group);
        if (datahubPlatform.equals(KAFKA_PLATFORM)) {
            group = getKafkaTopicFromPath(matcher.group(2));
        } else if (datahubPlatform.equals("file") || datahubPlatform.equals(DELTA_LAKE_PLATFORM)) {
            try {
                return Optional.of(HdfsPathDataset.create(new Path(group), config).urn());
            } catch (InstantiationException e) {
                return Optional.empty();
            }
        }
        return Optional.of(new DatasetUrn(new DataPlatformUrn(datahubPlatform), group, DatahubConf.getCommonFabricType(config)));
    }

    public static String getDatahubPlatform(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1604307081:
                if (str.equals("FileStreamSource")) {
                    z = 4;
                    break;
                }
                break;
            case -1393646563:
                if (str.equals("CloudFilesSource")) {
                    z = 2;
                    break;
                }
                break;
            case -670921137:
                if (str.equals("FileSink")) {
                    z = 3;
                    break;
                }
                break;
            case 717809698:
                if (str.equals("KafkaV2")) {
                    z = false;
                    break;
                }
                break;
            case 2056622667:
                if (str.equals("DeltaSink")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return KAFKA_PLATFORM;
            case true:
                return DELTA_LAKE_PLATFORM;
            case true:
                return "dbfs";
            case true:
            case true:
                return "file";
            default:
                return str;
        }
    }

    public static String getKafkaTopicFromPath(String str) {
        return StringUtils.substringBetween(str, "[", "]");
    }
}
