package zipkin.sparkstreaming;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.Util;
import zipkin.sparkstreaming.C$AutoValue_SparkStreamingJob;

/* loaded from: input_file:zipkin/sparkstreaming/SparkStreamingJob.class */
public abstract class SparkStreamingJob implements Closeable {
    final AtomicBoolean started = new AtomicBoolean(false);

    /* loaded from: input_file:zipkin/sparkstreaming/SparkStreamingJob$Builder.class */
    public interface Builder {
        Builder sparkMaster(String str);

        Builder sparkJars(List<String> list);

        Builder sparkProperties(Map<String, String> map);

        Builder batchDuration(long j);

        Builder streamFactory(StreamFactory streamFactory);

        Builder adjusters(List<Adjuster> list);

        Builder consumer(Consumer consumer);

        SparkStreamingJob build();
    }

    public static Builder newBuilder() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("spark.ui.enabled", "false");
        linkedHashMap.put("spark.akka.logLifecycleEvents", "true");
        return new C$AutoValue_SparkStreamingJob.Builder().sparkMaster("local[*]").sparkJars(Collections.emptyList()).sparkProperties(linkedHashMap).adjusters(Collections.emptyList()).batchDuration(10000L);
    }

    public abstract String sparkMaster();

    public abstract List<String> sparkJars();

    public abstract Map<String, String> sparkProperties();

    public abstract long batchDuration();

    public abstract StreamFactory streamFactory();

    public abstract List<Adjuster> adjusters();

    public abstract Consumer consumer();

    public JavaStreamingContext jsc() {
        SparkConf appName = new SparkConf(true).setMaster(sparkMaster()).setAppName(getClass().getName());
        if (!sparkJars().isEmpty()) {
            appName.setJars((String[]) sparkJars().toArray(new String[0]));
        }
        for (Map.Entry<String, String> entry : sparkProperties().entrySet()) {
            appName.set(entry.getKey(), entry.getValue());
        }
        return new JavaStreamingContext(appName, new Duration(batchDuration()));
    }

    public SparkStreamingJob start() {
        if (!this.started.compareAndSet(false, true)) {
            return this;
        }
        streamSpansToStorage(streamFactory().create(jsc()), adjusters(), consumer());
        jsc().start();
        return this;
    }

    public void awaitTermination() {
        if (this.started.get()) {
            jsc().awaitTermination();
        }
    }

    static void streamSpansToStorage(JavaDStream<byte[]> javaDStream, List<Adjuster> list, Consumer consumer) {
        Function<byte[], Boolean> function;
        FlatMapFunction<byte[], U> flatMapFunction;
        PairFunction pairFunction;
        function = SparkStreamingJob$$Lambda$1.instance;
        JavaDStream<byte[]> filter = javaDStream.filter(function);
        flatMapFunction = SparkStreamingJob$$Lambda$4.instance;
        JavaDStreamLike flatMap = filter.flatMap(flatMapFunction);
        pairFunction = SparkStreamingJob$$Lambda$5.instance;
        flatMap.mapToPair(pairFunction).groupByKey().foreachRDD(SparkStreamingJob$$Lambda$6.lambdaFactory$(list, consumer));
    }

    static List<Span> readSpans(byte[] bArr) {
        return bArr[0] == 91 ? Codec.JSON.readSpans(bArr) : bArr[0] == 12 ? Codec.THRIFT.readSpans(bArr) : Collections.singletonList(Codec.THRIFT.readSpan(bArr));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        jsc().close();
        if (consumer() instanceof Closeable) {
            ((Closeable) consumer()).close();
        }
    }

    public static /* synthetic */ void lambda$null$a9376653$1(List list, Consumer consumer, Iterator it) throws Exception {
        while (it.hasNext()) {
            Iterable<Span> iterable = (Iterable) it.next();
            Iterator it2 = list.iterator();
            while (it2.hasNext()) {
                iterable = ((Adjuster) it2.next()).adjust(iterable);
            }
            consumer.accept(iterable);
        }
    }

    public static /* synthetic */ Tuple2 lambda$streamSpansToStorage$97bf1693$1(Span span) throws Exception {
        return new Tuple2(Util.toLowerHex(span.traceIdHigh, span.traceId), span);
    }

    public static /* synthetic */ Iterable lambda$streamSpansToStorage$c727e0fd$1(byte[] bArr) throws Exception {
        try {
            return readSpans(bArr);
        } catch (RuntimeException e) {
            return Collections.emptyList();
        }
    }
}
