package io.openlineage.spark.agent.lifecycle;

import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.SparkVersionFacet;
import io.openlineage.spark.agent.lifecycle.DatasetParser;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.Strings;
import org.apache.spark.Dependency;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil;
import org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil;
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.NewHadoopRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.JobResult;
import org.apache.spark.scheduler.ResultStage;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.apache.spark.util.SerializableJobConf;
import org.apache.zookeeper.server.quorum.QuorumStats;
import scala.Function2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction0;

/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/RddExecutionContext.class */
class RddExecutionContext implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RddExecutionContext.class);
    private final EventEmitter eventEmitter;
    private String jobSuffix;
    private final UUID runId = UUID.randomUUID();
    private List<URI> inputs = Collections.emptyList();
    private List<URI> outputs = Collections.emptyList();
    private final Optional<SparkContext> sparkContextOption = Optional.ofNullable(SparkContext$.MODULE$.getActive().getOrElse(new AbstractFunction0<SparkContext>() { // from class: io.openlineage.spark.agent.lifecycle.RddExecutionContext.1
        /* renamed from: apply, reason: merged with bridge method [inline-methods] */
        public SparkContext m4504apply() {
            return null;
        }
    }));

    public RddExecutionContext(EventEmitter eventEmitter) {
        this.eventEmitter = eventEmitter;
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerStageSubmitted sparkListenerStageSubmitted) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerStageCompleted sparkListenerStageCompleted) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void setActiveJob(ActiveJob activeJob) {
        RDD<?> rdd = activeJob.finalStage().rdd();
        this.jobSuffix = nameRDD(rdd);
        this.inputs = findInputs(Rdds.flattenRDDs(rdd));
        JobConf jobConf = new JobConf();
        if (activeJob.finalStage() instanceof ResultStage) {
            Function2<TaskContext, Iterator<?>, ?> func = activeJob.finalStage().func();
            try {
                Field configField = getConfigField(func);
                configField.setAccessible(true);
                Object obj = configField.get(func);
                if (obj instanceof HadoopMapRedWriteConfigUtil) {
                    Field declaredField = HadoopMapRedWriteConfigUtil.class.getDeclaredField("conf");
                    declaredField.setAccessible(true);
                    jobConf = ((SerializableJobConf) declaredField.get(obj)).value();
                } else if (obj instanceof HadoopMapReduceWriteConfigUtil) {
                    Field declaredField2 = HadoopMapReduceWriteConfigUtil.class.getDeclaredField("conf");
                    declaredField2.setAccessible(true);
                    jobConf = ((SerializableJobConf) declaredField2.get(obj)).value();
                } else {
                    log.info("Config field is not HadoopMapRedWriteConfigUtil or HadoopMapReduceWriteConfigUtil, it's {}", obj.getClass().getCanonicalName());
                }
            } catch (IllegalAccessException | NoSuchFieldException e) {
                log.warn("Unable to access job conf from RDD", e);
            }
            log.info("Found job conf from RDD {}", jobConf);
        } else {
            jobConf = OpenLineageSparkListener.getConfigForRDD(rdd);
        }
        this.outputs = findOutputs(rdd, jobConf);
    }

    private Field getConfigField(Function2<TaskContext, Iterator<?>, ?> function2) throws NoSuchFieldException {
        try {
            return function2.getClass().getDeclaredField("config$1");
        } catch (NoSuchFieldException e) {
            return function2.getClass().getDeclaredField("arg$1");
        }
    }

    static String nameRDD(RDD<?> rdd) {
        String name = rdd.name();
        if (name == null || (((rdd instanceof HadoopRDD) && Arrays.stream(FileInputFormat.getInputPaths(((HadoopRDD) rdd).getJobConf())).anyMatch(path -> {
            return path.toString().contains(rdd.name());
        })) || ((rdd instanceof MapPartitionsRDD) && rdd.name().equals(((MapPartitionsRDD) rdd).prev().name())))) {
            name = rdd.getClass().getSimpleName().replaceAll("RDD\\d*$", "").replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT);
        }
        List fromSeq = ScalaConversionUtils.fromSeq(rdd.dependencies());
        if (fromSeq.isEmpty()) {
            return name;
        }
        ArrayList arrayList = new ArrayList();
        java.util.Iterator it = fromSeq.iterator();
        while (it.hasNext()) {
            arrayList.add(nameRDD(((Dependency) it.next()).rdd()));
        }
        String join = Strings.join(arrayList, "_");
        return !join.startsWith(name) ? name + "_" + join : join;
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerJobStart sparkListenerJobStart) {
        if (this.inputs.isEmpty() && this.outputs.isEmpty()) {
            log.info("RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        OpenLineage.RunEvent build = openLineage.newRunEventBuilder().eventTime(toZonedTime(sparkListenerJobStart.time())).eventType(OpenLineage.RunEvent.EventType.START).inputs(buildInputs(this.inputs)).outputs(buildOutputs(this.outputs)).run(openLineage.newRunBuilder().runId(this.runId).facets(buildRunFacets(null)).build()).job(buildJob(sparkListenerJobStart.jobId())).build();
        log.debug("Posting event for start {}: {}", sparkListenerJobStart, build);
        this.eventEmitter.emit(build);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerJobEnd sparkListenerJobEnd) {
        if (this.inputs.isEmpty() && this.outputs.isEmpty() && !(sparkListenerJobEnd.jobResult() instanceof JobFailed)) {
            log.info("RDDs are empty: skipping sending OpenLineage event");
            return;
        }
        OpenLineage openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        OpenLineage.RunEvent build = openLineage.newRunEventBuilder().eventTime(toZonedTime(sparkListenerJobEnd.time())).eventType(getEventType(sparkListenerJobEnd.jobResult())).inputs(buildInputs(this.inputs)).outputs(buildOutputs(this.outputs)).run(openLineage.newRunBuilder().runId(this.runId).facets(buildRunFacets(buildJobErrorFacet(sparkListenerJobEnd.jobResult()))).build()).job(buildJob(sparkListenerJobEnd.jobId())).build();
        log.debug("Posting event for end {}: {}", sparkListenerJobEnd, build);
        this.eventEmitter.emit(build);
    }

    protected ZonedDateTime toZonedTime(long j) {
        return ZonedDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneOffset.UTC);
    }

    protected OpenLineage.RunFacets buildRunFacets(ErrorFacet errorFacet) {
        OpenLineage.RunFacetsBuilder runFacetsBuilder = new OpenLineage.RunFacetsBuilder();
        Optional<OpenLineage.ParentRunFacet> buildParentFacet = buildParentFacet();
        runFacetsBuilder.getClass();
        buildParentFacet.ifPresent(runFacetsBuilder::parent);
        if (errorFacet != null) {
            runFacetsBuilder.put("spark.exception", errorFacet);
        }
        this.sparkContextOption.ifPresent(sparkContext -> {
            runFacetsBuilder.put("spark_version", new SparkVersionFacet(sparkContext));
        });
        return runFacetsBuilder.build();
    }

    private Optional<OpenLineage.ParentRunFacet> buildParentFacet() {
        return this.eventEmitter.getParentRunId().map(uuid -> {
            return PlanUtils.parentRunFacet(uuid, this.eventEmitter.getParentJobName(), this.eventEmitter.getJobNamespace());
        });
    }

    protected ErrorFacet buildJobErrorFacet(JobResult jobResult) {
        if (!(jobResult instanceof JobFailed) || ((JobFailed) jobResult).exception() == null) {
            return null;
        }
        return ErrorFacet.builder().exception(((JobFailed) jobResult).exception()).build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected OpenLineage.Job buildJob(int i) {
        String str = this.jobSuffix;
        if (this.jobSuffix == null) {
            str = String.valueOf(i);
        }
        return new OpenLineage.JobBuilder().namespace(this.eventEmitter.getJobNamespace()).name((((String) this.eventEmitter.getAppName().orElse(this.sparkContextOption.map((v0) -> {
            return v0.appName();
        }).orElse(QuorumStats.Provider.UNKNOWN_STATE))) + "." + str).replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT)).build();
    }

    protected List<OpenLineage.OutputDataset> buildOutputs(List<URI> list) {
        return (List) list.stream().map(this::buildOutputDataset).collect(Collectors.toList());
    }

    protected OpenLineage.InputDataset buildInputDataset(URI uri) {
        DatasetParser.DatasetParseResult parse = DatasetParser.parse(uri);
        return new OpenLineage.InputDatasetBuilder().name(parse.getName()).namespace(parse.getNamespace()).build();
    }

    protected OpenLineage.OutputDataset buildOutputDataset(URI uri) {
        DatasetParser.DatasetParseResult parse = DatasetParser.parse(uri);
        return new OpenLineage.OutputDatasetBuilder().name(parse.getName()).namespace(parse.getNamespace()).build();
    }

    protected List<OpenLineage.InputDataset> buildInputs(List<URI> list) {
        return (List) list.stream().map(this::buildInputDataset).collect(Collectors.toList());
    }

    protected List<URI> findOutputs(RDD<?> rdd, Configuration configuration) {
        Path outputPath = getOutputPath(rdd, configuration);
        log.info("Found output path {} from RDD {}", outputPath, rdd);
        return outputPath != null ? Collections.singletonList(getDatasetUri(outputPath.toUri())) : Collections.emptyList();
    }

    protected List<URI> findInputs(Set<RDD<?>> set) {
        ArrayList arrayList = new ArrayList();
        java.util.Iterator<RDD<?>> it = set.iterator();
        while (it.hasNext()) {
            Path[] inputPaths = getInputPaths(it.next());
            if (inputPaths != null) {
                for (Path path : inputPaths) {
                    arrayList.add(getDatasetUri(path.toUri()));
                }
            }
        }
        return arrayList;
    }

    protected Path[] getInputPaths(RDD<?> rdd) {
        Path[] pathArr = null;
        if (rdd instanceof HadoopRDD) {
            pathArr = FileInputFormat.getInputPaths(((HadoopRDD) rdd).getJobConf());
        } else if (rdd instanceof NewHadoopRDD) {
            try {
                pathArr = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths(new Job(((NewHadoopRDD) rdd).getConf()));
            } catch (IOException e) {
                log.error("Openlineage spark agent could not get input paths", (Throwable) e);
            }
        }
        return pathArr;
    }

    protected URI getDatasetUri(URI uri) {
        return uri;
    }

    protected void printRDDs(String str, RDD<?> rdd) {
        java.util.Iterator it = JavaConversions.asJavaCollection(rdd.dependencies()).iterator();
        while (it.hasNext()) {
            printRDDs(str + "  ", ((Dependency) it.next()).rdd());
        }
    }

    protected static Path getOutputPath(RDD<?> rdd, Configuration configuration) {
        if (configuration == null) {
            return null;
        }
        JobConf jobConf = configuration instanceof JobConf ? (JobConf) configuration : new JobConf(configuration);
        Path outputPath = FileOutputFormat.getOutputPath(jobConf);
        if (outputPath == null) {
            try {
                outputPath = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(new Job(jobConf));
            } catch (IOException e) {
                e.printStackTrace(System.out);
            }
        }
        return outputPath;
    }

    protected OpenLineage.RunEvent.EventType getEventType(JobResult jobResult) {
        return jobResult.getClass().getSimpleName().startsWith("JobSucceeded") ? OpenLineage.RunEvent.EventType.COMPLETE : OpenLineage.RunEvent.EventType.FAIL;
    }
}
