package ai.tripl.arc.load;

import ai.tripl.arc.api.API;
import ai.tripl.arc.util.log.logger.Logger;
import java.util.HashMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.util.LongAccumulator;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.runtime.BoxesRunTime;

/* compiled from: AzureEventHubsLoad.scala */
/* loaded from: input_file:ai/tripl/arc/load/AzureEventHubsLoad$.class */
public final class AzureEventHubsLoad$ {
    public static final AzureEventHubsLoad$ MODULE$ = null;

    static {
        new AzureEventHubsLoad$();
    }

    public Option<Dataset<Row>> load(API.AzureEventHubsLoad azureEventHubsLoad, SparkSession sparkSession, Logger logger) {
        Dataset dataset;
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        hashMap.put("type", azureEventHubsLoad.getType());
        hashMap.put("name", azureEventHubsLoad.name());
        azureEventHubsLoad.description().foreach(new AzureEventHubsLoad$$anonfun$load$1(hashMap));
        hashMap.put("inputView", azureEventHubsLoad.inputView());
        hashMap.put("namespaceName", azureEventHubsLoad.namespaceName());
        hashMap.put("eventHubName", azureEventHubsLoad.eventHubName());
        hashMap.put("sharedAccessSignatureKeyName", azureEventHubsLoad.sharedAccessSignatureKeyName());
        hashMap.put("retryMinBackoff", Long.valueOf(azureEventHubsLoad.retryMinBackoff()));
        hashMap.put("retryMaxBackoff", Long.valueOf(azureEventHubsLoad.retryMaxBackoff()));
        hashMap.put("retryCount", Integer.valueOf(azureEventHubsLoad.retryCount()));
        logger.info().field("event", "enter").map("stage", hashMap).log();
        Dataset table = sparkSession.table(azureEventHubsLoad.inputView());
        if (table.schema().length() == 1) {
            DataType dataType = table.schema().apply(0).dataType();
            StringType$ stringType$ = StringType$.MODULE$;
            if (dataType != null ? dataType.equals(stringType$) : stringType$ == null) {
                Some numPartitions = azureEventHubsLoad.numPartitions();
                if (numPartitions instanceof Some) {
                    int unboxToInt = BoxesRunTime.unboxToInt(numPartitions.x());
                    hashMap.put("numPartitions", Integer.valueOf(unboxToInt));
                    dataset = table.repartition(unboxToInt);
                } else {
                    if (!None$.MODULE$.equals(numPartitions)) {
                        throw new MatchError(numPartitions);
                    }
                    hashMap.put("numPartitions", Integer.valueOf(table.rdd().getNumPartitions()));
                    dataset = table;
                }
                Dataset dataset2 = dataset;
                LongAccumulator longAccumulator = sparkSession.sparkContext().longAccumulator();
                LongAccumulator longAccumulator2 = sparkSession.sparkContext().longAccumulator();
                LongAccumulator longAccumulator3 = sparkSession.sparkContext().longAccumulator();
                HashMap hashMap2 = new HashMap();
                try {
                    dataset2.foreachPartition(new AzureEventHubsLoad$$anonfun$load$2(azureEventHubsLoad, longAccumulator, longAccumulator2, longAccumulator3));
                    hashMap2.put("recordsWritten", Long.valueOf(Predef$.MODULE$.Long2long(longAccumulator.value())));
                    hashMap2.put("bytesWritten", Long.valueOf(Predef$.MODULE$.Long2long(longAccumulator2.value())));
                    hashMap2.put("batchesWritten", Long.valueOf(Predef$.MODULE$.Long2long(longAccumulator3.value())));
                    hashMap.put("outputMetrics", hashMap2);
                    logger.info().field("event", "exit").field("duration", BoxesRunTime.boxToLong(System.currentTimeMillis() - currentTimeMillis)).map("stage", hashMap).log();
                    return Option$.MODULE$.apply(dataset2);
                } catch (Exception e) {
                    throw new AzureEventHubsLoad$$anon$2(hashMap, longAccumulator, longAccumulator2, longAccumulator3, hashMap2, e);
                }
            }
        }
        throw new AzureEventHubsLoad$$anon$1(azureEventHubsLoad, hashMap, "AzureEventHubsLoad requires inputView to be dataset with [value: string] signature.", table);
    }

    private AzureEventHubsLoad$() {
        MODULE$ = this;
    }
}
