package zipkin.sparkstreaming;

import com.google.auto.value.AutoValue;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import zipkin.Span;
import zipkin.internal.Util;
import zipkin.sparkstreaming.C$AutoValue_SparkStreamingJob;

@AutoValue
/* loaded from: input_file:zipkin/sparkstreaming/SparkStreamingJob.class */
public abstract class SparkStreamingJob implements Closeable {
    final AtomicBoolean started = new AtomicBoolean(false);

    @AutoValue.Builder
    /* loaded from: input_file:zipkin/sparkstreaming/SparkStreamingJob$Builder.class */
    public interface Builder {
        Builder master(String str);

        Builder jars(List<String> list);

        Builder conf(Map<String, String> map);

        Builder batchDuration(long j);

        Builder streamFactory(StreamFactory streamFactory);

        Builder adjusters(List<Adjuster> list);

        Builder consumer(Consumer consumer);

        Builder zipkinLogLevel(String str);

        SparkStreamingJob build();
    }

    public static Builder newBuilder() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("spark.ui.enabled", "false");
        return new C$AutoValue_SparkStreamingJob.Builder().master("local[*]").jars(Collections.emptyList()).conf(linkedHashMap).adjusters(Collections.emptyList()).batchDuration(10000L).zipkinLogLevel("INFO");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract String master();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract List<String> jars();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Map<String, String> conf();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract long batchDuration();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract StreamFactory streamFactory();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract List<Adjuster> adjusters();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Consumer consumer();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract String zipkinLogLevel();

    /* JADX INFO: Access modifiers changed from: package-private */
    public JavaStreamingContext jsc() {
        SparkConf appName = new SparkConf(true).setMaster(master()).setAppName(getClass().getName());
        if (!jars().isEmpty()) {
            appName.setJars((String[]) jars().toArray(new String[0]));
        }
        for (Map.Entry<String, String> entry : conf().entrySet()) {
            appName.set(entry.getKey(), entry.getValue());
        }
        return new JavaStreamingContext(appName, new Duration(batchDuration()));
    }

    public SparkStreamingJob start() {
        if (!this.started.compareAndSet(false, true)) {
            return this;
        }
        Runnable create = LogInitializer.create(zipkinLogLevel());
        create.run();
        streamSpansToStorage(streamFactory().create(jsc()), new AutoValue_ReadSpans(create), new AutoValue_AdjustAndConsumeSpansSharingTraceId(create, adjusters(), consumer()));
        jsc().start();
        return this;
    }

    public void awaitTermination() {
        if (this.started.get()) {
            jsc().awaitTermination();
        }
    }

    static void streamSpansToStorage(JavaDStream<byte[]> javaDStream, ReadSpans readSpans, AdjustAndConsumeSpansSharingTraceId adjustAndConsumeSpansSharingTraceId) {
        javaDStream.flatMap(readSpans).mapToPair(SparkStreamingJob$$Lambda$1.lambdaFactory$()).groupByKey().foreachRDD(SparkStreamingJob$$Lambda$2.lambdaFactory$(adjustAndConsumeSpansSharingTraceId));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        jsc().close();
        if (consumer() instanceof Closeable) {
            ((Closeable) consumer()).close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ Tuple2 lambda$streamSpansToStorage$d5ec7266$1(Span span) throws Exception {
        return new Tuple2(Util.toLowerHex(span.traceIdHigh, span.traceId), span);
    }
}
