package ai.tripl.arc.execute;

import ai.tripl.arc.api.API;
import ai.tripl.arc.util.log.logger.Logger;
import java.util.HashMap;
import java.util.Properties;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaCommitExecute.scala */
/* loaded from: input_file:ai/tripl/arc/execute/KafkaCommitExecute$.class */
public final class KafkaCommitExecute$ {
    public static final KafkaCommitExecute$ MODULE$ = null;

    static {
        new KafkaCommitExecute$();
    }

    public Option<Dataset<Row>> execute(API.KafkaCommitExecute kafkaCommitExecute, SparkSession sparkSession, Logger logger) {
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        hashMap.put("type", kafkaCommitExecute.getType());
        hashMap.put("name", kafkaCommitExecute.name());
        kafkaCommitExecute.description().foreach(new KafkaCommitExecute$$anonfun$execute$1(hashMap));
        hashMap.put("inputView", kafkaCommitExecute.inputView());
        hashMap.put("bootstrapServers", kafkaCommitExecute.bootstrapServers());
        logger.info().field("event", "enter").map("stage", hashMap).log();
        Dataset table = sparkSession.table(kafkaCommitExecute.inputView());
        HashMap hashMap2 = new HashMap();
        try {
            Dataset limit = table.groupBy(Predef$.MODULE$.wrapRefArray(new Column[]{table.apply("topic"), table.apply("partition")})).agg(functions$.MODULE$.max(table.apply("offset")), Predef$.MODULE$.wrapRefArray(new Column[0])).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{table.apply("topic"), table.apply("partition")})).limit(10000);
            Properties properties = new Properties();
            properties.put("bootstrap.servers", kafkaCommitExecute.bootstrapServers());
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("enable.auto.commit", "false");
            Predef$.MODULE$.refArrayOps((Object[]) limit.collect()).foreach(new KafkaCommitExecute$$anonfun$execute$2(kafkaCommitExecute, hashMap, hashMap2, properties));
            logger.info().field("event", "exit").field("duration", BoxesRunTime.boxToLong(System.currentTimeMillis() - currentTimeMillis)).map("stage", hashMap).log();
            return None$.MODULE$;
        } catch (Exception e) {
            throw new KafkaCommitExecute$$anon$1(hashMap, e);
        }
    }

    private KafkaCommitExecute$() {
        MODULE$ = this;
    }
}
