package ai.starlake.job.sink.kafka;

import ai.starlake.utils.SparkJobResult;
import ai.starlake.utils.kafka.KafkaClient;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.MatchError;
import scala.None$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.immutable.List;
import scala.collection.mutable.StringBuilder;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaJob.scala */
/* loaded from: input_file:ai/starlake/job/sink/kafka/KafkaJob$$anonfun$offload$1$$anonfun$apply$2.class */
public final class KafkaJob$$anonfun$offload$1$$anonfun$apply$2 extends AbstractFunction1<KafkaClient, SparkJobResult> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ KafkaJob$$anonfun$offload$1 $outer;

    public final SparkJobResult apply(KafkaClient kafkaClient) {
        Dataset<Row> repartition;
        Boolean bool;
        Boolean bool2;
        Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch = kafkaClient.consumeTopicBatch(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().topicConfigName(), this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().session(), this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().ai$starlake$job$sink$kafka$KafkaJob$$topicConfig());
        if (consumeTopicBatch == null) {
            throw new MatchError(consumeTopicBatch);
        }
        Tuple2 tuple2 = new Tuple2((Dataset) consumeTopicBatch._1(), (List) consumeTopicBatch._2());
        Dataset<Row> dataset = (Dataset) tuple2._1();
        List<Tuple2<Object, Object>> list = (List) tuple2._2();
        Dataset<Row> ai$starlake$job$sink$kafka$KafkaJob$$transfom = this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().ai$starlake$job$sink$kafka$KafkaJob$$transfom(dataset);
        Some coalesce = this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().coalesce();
        if (None$.MODULE$.equals(coalesce)) {
            repartition = ai$starlake$job$sink$kafka$KafkaJob$$transfom;
        } else {
            if (!(coalesce instanceof Some)) {
                throw new MatchError(coalesce);
            }
            repartition = ai$starlake$job$sink$kafka$KafkaJob$$transfom.repartition(BoxesRunTime.unboxToInt(coalesce.x()));
        }
        Dataset<Row> dataset2 = repartition;
        if (this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().logger().underlying().isInfoEnabled()) {
            this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().logger().underlying().info("Saving to {}", new Object[]{this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig()});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        dataset2.write().mode(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().mode()).format(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().format()).options(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().writeOptions()).save(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().ai$starlake$job$sink$kafka$KafkaJob$$finalPath());
        if (this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().logger().underlying().isInfoEnabled()) {
            this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().logger().underlying().info("Kafka saved messages to offload -> {}", new Object[]{this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().ai$starlake$job$sink$kafka$KafkaJob$$finalPath()});
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Some coalesce2 = this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().coalesce();
        if ((coalesce2 instanceof Some) && 1 == BoxesRunTime.unboxToInt(coalesce2.x())) {
            this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().format();
            Path path = new Path(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().ai$starlake$job$sink$kafka$KafkaJob$$finalPath());
            Path path2 = (Path) ((IterableLike) this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().storageHandler().list(path, this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().storageHandler().list$default$2(), this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().storageHandler().list$default$3(), false, this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().storageHandler().list$default$5()).filter(new KafkaJob$$anonfun$offload$1$$anonfun$apply$2$$anonfun$3(this))).head();
            Path path3 = new Path(new StringBuilder().append(path.toString()).append(".tmp").toString());
            if (this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().storageHandler().move(path2, path3)) {
                this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().storageHandler().delete(path);
                bool2 = BoxesRunTime.boxToBoolean(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().storageHandler().move(path3, path));
            } else {
                bool2 = BoxedUnit.UNIT;
            }
            bool = bool2;
        } else {
            bool = BoxedUnit.UNIT;
        }
        kafkaClient.topicSaveOffsets(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().kafkaJobConfig().topicConfigName(), this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().ai$starlake$job$sink$kafka$KafkaJob$$topicConfig().allAccessOptions(this.$outer.ai$starlake$job$sink$kafka$KafkaJob$$anonfun$$$outer().settings().comet().kafka().sparkServerOptions()), list);
        return new SparkJobResult(new Some(ai$starlake$job$sink$kafka$KafkaJob$$transfom));
    }

    public KafkaJob$$anonfun$offload$1$$anonfun$apply$2(KafkaJob$$anonfun$offload$1 kafkaJob$$anonfun$offload$1) {
        if (kafkaJob$$anonfun$offload$1 == null) {
            throw null;
        }
        this.$outer = kafkaJob$$anonfun$offload$1;
    }
}
