package ai.tripl.arc.extract;

import ai.tripl.arc.api.API;
import ai.tripl.arc.util.log.logger.Logger;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$implicits$;
import org.apache.spark.storage.StorageLevel$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaExtract.scala */
/* loaded from: input_file:ai/tripl/arc/extract/KafkaExtract$.class */
public final class KafkaExtract$ {
    public static final KafkaExtract$ MODULE$ = null;

    static {
        new KafkaExtract$();
    }

    public Option<Dataset<Row>> extract(API.KafkaExtract kafkaExtract, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        Dataset df;
        Dataset repartition;
        Dataset dataset;
        Dataset dataset2;
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        hashMap.put("type", kafkaExtract.getType());
        hashMap.put("name", kafkaExtract.name());
        kafkaExtract.description().foreach(new KafkaExtract$$anonfun$extract$1(hashMap));
        hashMap.put("outputView", kafkaExtract.outputView());
        hashMap.put("bootstrapServers", kafkaExtract.bootstrapServers());
        hashMap.put("groupID", kafkaExtract.groupID());
        hashMap.put("topic", kafkaExtract.topic());
        hashMap.put("maxPollRecords", Integer.valueOf(kafkaExtract.maxPollRecords()));
        hashMap.put("timeout", Long.valueOf(kafkaExtract.timeout()));
        hashMap.put("autoCommit", Boolean.valueOf(kafkaExtract.autoCommit()));
        hashMap.put("persist", Boolean.valueOf(kafkaExtract.persist()));
        logger.info().field("event", "enter").map("stage", hashMap).log();
        if (aRCContext.isStreaming()) {
            df = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", kafkaExtract.bootstrapServers()).option("subscribe", kafkaExtract.topic()).load();
        } else {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", kafkaExtract.bootstrapServers());
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("enable.auto.commit", "false");
            properties.put("auto.offset.reset", "earliest");
            properties.put("request.timeout.ms", BoxesRunTime.boxToLong(kafkaExtract.timeout()).toString());
            properties.put("session.timeout.ms", BoxesRunTime.boxToLong(Math.min(10000L, kafkaExtract.timeout() - 1)).toString());
            properties.put("fetch.max.wait.ms", BoxesRunTime.boxToLong(Math.min(500L, kafkaExtract.timeout() - 1)).toString());
            properties.put("heartbeat.interval.ms", BoxesRunTime.boxToLong(Math.min(3000L, kafkaExtract.timeout() - 2)).toString());
            Properties properties2 = new Properties();
            properties2.putAll(properties);
            properties2.put("group.id", kafkaExtract.groupID());
            try {
                Dataset repartition2 = sparkSession.sqlContext().emptyDataFrame().repartition(liftedTree1$1(kafkaExtract, hashMap, properties2));
                KafkaExtract$$anonfun$1 kafkaExtract$$anonfun$1 = new KafkaExtract$$anonfun$1(kafkaExtract, properties);
                SparkSession$implicits$ implicits = sparkSession.implicits();
                TypeTags universe = package$.MODULE$.universe();
                df = repartition2.mapPartitions(kafkaExtract$$anonfun$1, implicits.newProductEncoder(universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: ai.tripl.arc.extract.KafkaExtract$$typecreator4$1
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                        mirror.universe();
                        return mirror.staticClass("ai.tripl.arc.extract.KafkaRecord").asType().toTypeConstructor();
                    }
                }))).toDF();
            } catch (Exception e) {
                throw new KafkaExtract$$anon$2(hashMap, e);
            }
        }
        Dataset dataset3 = df;
        List<String> partitionBy = kafkaExtract.partitionBy();
        if (Nil$.MODULE$.equals(partitionBy)) {
            Some numPartitions = kafkaExtract.numPartitions();
            if (numPartitions instanceof Some) {
                dataset2 = dataset3.repartition(BoxesRunTime.unboxToInt(numPartitions.x()));
            } else {
                if (!None$.MODULE$.equals(numPartitions)) {
                    throw new MatchError(numPartitions);
                }
                dataset2 = dataset3;
            }
            dataset = dataset2;
        } else {
            List list = (List) partitionBy.map(new KafkaExtract$$anonfun$2(dataset3), List$.MODULE$.canBuildFrom());
            Some numPartitions2 = kafkaExtract.numPartitions();
            if (numPartitions2 instanceof Some) {
                repartition = dataset3.repartition(BoxesRunTime.unboxToInt(numPartitions2.x()), list);
            } else {
                if (!None$.MODULE$.equals(numPartitions2)) {
                    throw new MatchError(numPartitions2);
                }
                repartition = dataset3.repartition(list);
            }
            dataset = repartition;
        }
        Dataset dataset4 = dataset;
        dataset4.createOrReplaceTempView(kafkaExtract.outputView());
        if (dataset4.isStreaming()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            hashMap.put("outputColumns", Integer.valueOf(dataset4.schema().length()));
            hashMap.put("numPartitions", Integer.valueOf(dataset4.rdd().partitions().length));
        }
        if ((kafkaExtract.persist() || !kafkaExtract.autoCommit()) && !dataset4.isStreaming()) {
            dataset4.persist(StorageLevel$.MODULE$.MEMORY_AND_DISK_SER());
            hashMap.put("records", Long.valueOf(dataset4.count()));
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        logger.info().field("event", "exit").field("duration", BoxesRunTime.boxToLong(System.currentTimeMillis() - currentTimeMillis)).map("stage", hashMap).log();
        return Option$.MODULE$.apply(dataset4);
    }

    private final int liftedTree1$1(API.KafkaExtract kafkaExtract, HashMap hashMap, Properties properties) {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            try {
                return kafkaConsumer.partitionsFor(kafkaExtract.topic()).size();
            } finally {
                kafkaConsumer.close();
            }
        } catch (Exception e) {
            throw new KafkaExtract$$anon$1(hashMap, e);
        }
    }

    private KafkaExtract$() {
        MODULE$ = this;
    }
}
