package ai.starlake.job.sink.es;

import ai.starlake.config.Settings;
import ai.starlake.config.SparkEnv;
import ai.starlake.schema.handlers.SchemaHandler;
import ai.starlake.schema.handlers.StorageHandler;
import ai.starlake.schema.model.Domain;
import ai.starlake.schema.model.Schema;
import ai.starlake.schema.model.SinkType;
import ai.starlake.schema.model.Views;
import ai.starlake.utils.JobBase;
import ai.starlake.utils.JobResult;
import ai.starlake.utils.SparkJob;
import ai.starlake.utils.SparkJobResult;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import org.apache.hadoop.fs.Path;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
import scala.util.Success;
import scala.util.Try;

/* compiled from: ESLoadJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mc\u0001B\u0001\u0003\u00015\u0011\u0011\"R*M_\u0006$'j\u001c2\u000b\u0005\r!\u0011AA3t\u0015\t)a!\u0001\u0003tS:\\'BA\u0004\t\u0003\rQwN\u0019\u0006\u0003\u0013)\t\u0001b\u001d;be2\f7.\u001a\u0006\u0002\u0017\u0005\u0011\u0011-[\u0002\u0001'\r\u0001a\u0002\u0006\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0002#\u0005)1oY1mC&\u00111\u0003\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005UAR\"\u0001\f\u000b\u0005]A\u0011!B;uS2\u001c\u0018BA\r\u0017\u0005!\u0019\u0006/\u0019:l\u0015>\u0014\u0007\u0002C\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\u0013\rd\u0017nQ8oM&<\u0007CA\u000f\u001f\u001b\u0005\u0011\u0011BA\u0010\u0003\u00051)5\u000bT8bI\u000e{gNZ5h\u0011!\t\u0003A!A!\u0002\u0013\u0011\u0013AD:u_J\fw-\u001a%b]\u0012dWM\u001d\t\u0003G!j\u0011\u0001\n\u0006\u0003K\u0019\n\u0001\u0002[1oI2,'o\u001d\u0006\u0003O!\taa]2iK6\f\u0017BA\u0015%\u00059\u0019Fo\u001c:bO\u0016D\u0015M\u001c3mKJD\u0001b\u000b\u0001\u0003\u0002\u0003\u0006I\u0001L\u0001\u000eg\u000eDW-\\1IC:$G.\u001a:\u0011\u0005\rj\u0013B\u0001\u0018%\u00055\u00196\r[3nC\"\u000bg\u000e\u001a7fe\"A\u0001\u0007\u0001BC\u0002\u0013\r\u0011'\u0001\u0005tKR$\u0018N\\4t+\u0005\u0011\u0004CA\u001a7\u001b\u0005!$BA\u001b\t\u0003\u0019\u0019wN\u001c4jO&\u0011q\u0007\u000e\u0002\t'\u0016$H/\u001b8hg\"A\u0011\b\u0001B\u0001B\u0003%!'A\u0005tKR$\u0018N\\4tA!)1\b\u0001C\u0001y\u00051A(\u001b8jiz\"B!\u0010!B\u0005R\u0011ah\u0010\t\u0003;\u0001AQ\u0001\r\u001eA\u0004IBQa\u0007\u001eA\u0002qAQ!\t\u001eA\u0002\tBQa\u000b\u001eA\u00021Bq\u0001\u0012\u0001C\u0002\u0013\u0005Q)\u0001\u0003qCRDW#\u0001$\u0011\t\u001d{%K\u0018\b\u0003\u00116s!!\u0013'\u000e\u0003)S!a\u0013\u0007\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0012B\u0001(\u0011\u0003\u001d\u0001\u0018mY6bO\u0016L!\u0001U)\u0003\r\u0015KG\u000f[3s\u0015\tq\u0005\u0003\u0005\u0002T96\tAK\u0003\u0002V-\u0006\u0011am\u001d\u0006\u0003/b\u000ba\u0001[1e_>\u0004(BA-[\u0003\u0019\t\u0007/Y2iK*\t1,A\u0002pe\u001eL!!\u0018+\u0003\tA\u000bG\u000f\u001b\t\u0003?6t!\u0001Y6\u000f\u0005\u0005DgB\u00012g\u001d\t\u0019WM\u0004\u0002JI&\t1,\u0003\u0002Z5&\u0011q\rW\u0001\u0006gB\f'o[\u0005\u0003S*\f1a]9m\u0015\t9\u0007,\u0003\u0002OY*\u0011\u0011N[\u0005\u0003]>\u0014\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u00059c\u0007BB9\u0001A\u0003%a)A\u0003qCRD\u0007\u0005C\u0004t\u0001\t\u0007I\u0011\u0001;\u0002\r\u0019|'/\\1u+\u0005)\bC\u0001<z\u001d\tyq/\u0003\u0002y!\u00051\u0001K]3eK\u001aL!A_>\u0003\rM#(/\u001b8h\u0015\tA\b\u0003\u0003\u0004~\u0001\u0001\u0006I!^\u0001\bM>\u0014X.\u0019;!\u0011!y\bA1A\u0005\u0002\u0005\u0005\u0011a\u00023bi\u0006\u001cX\r^\u000b\u0003\u0003\u0007\u0001BaDA\u0003\r&\u0019\u0011q\u0001\t\u0003\r=\u0003H/[8o\u0011!\tY\u0001\u0001Q\u0001\n\u0005\r\u0011\u0001\u00033bi\u0006\u001cX\r\u001e\u0011\t\u0013\u0005=\u0001A1A\u0005\u0002\u0005E\u0011A\u00023p[\u0006Lg.\u0006\u0002\u0002\u0014A)q\"!\u0002\u0002\u0016A!\u0011qCA\u000f\u001b\t\tIBC\u0002\u0002\u001c\u0019\nQ!\\8eK2LA!a\b\u0002\u001a\t1Ai\\7bS:D\u0001\"a\t\u0001A\u0003%\u00111C\u0001\bI>l\u0017-\u001b8!\u0011!9\u0003A1A\u0005\u0002\u0005\u001dRCAA\u0015!\u0015y\u0011QAA\u0016!\u0011\t9\"!\f\n\t\u0005=\u0012\u0011\u0004\u0002\u0007'\u000eDW-\\1\t\u0011\u0005M\u0002\u0001)A\u0005\u0003S\tqa]2iK6\f\u0007\u0005\u0003\u0004\u00028\u0001!\t\u0005^\u0001\u0005]\u0006lW\rC\u0004\u0002<\u0001!\t!!\u0010\u0002\u0019\u001d,G/\u00138eKbt\u0015-\\3\u0015\u0003UDq!!\u0011\u0001\t\u0003\ti$A\u0006hKR\u0014Vm]8ve\u000e,\u0007bBA#\u0001\u0011\u0005\u0013qI\u0001\u0004eVtGCAA%!\u0019\tY%!\u0015\u0002V5\u0011\u0011Q\n\u0006\u0004\u0003\u001f\u0002\u0012\u0001B;uS2LA!a\u0015\u0002N\t\u0019AK]=\u0011\u0007U\t9&C\u0002\u0002ZY\u0011\u0011BS8c%\u0016\u001cX\u000f\u001c;")
/* loaded from: input_file:ai/starlake/job/sink/es/ESLoadJob.class */
public class ESLoadJob implements SparkJob {
    public final ESLoadConfig ai$starlake$job$sink$es$ESLoadJob$$cliConfig;
    public final StorageHandler ai$starlake$job$sink$es$ESLoadJob$$storageHandler;
    public final SchemaHandler ai$starlake$job$sink$es$ESLoadJob$$schemaHandler;
    private final Settings settings;
    private final Either<Path, Dataset<Row>> path;
    private final String format;
    private final Option<Either<Path, Dataset<Row>>> dataset;
    private final Option<Domain> domain;
    private final Option<Schema> schema;
    private final SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
    private final SparkSession session;
    private final Logger logger;
    private volatile byte bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.ai$starlake$utils$SparkJob$$sparkEnv = SparkJob.Cclass.ai$starlake$utils$SparkJob$$sparkEnv(this);
                this.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.ai$starlake$utils$SparkJob$$sparkEnv;
        }
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkEnv ai$starlake$utils$SparkJob$$sparkEnv() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() : this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private SparkSession session$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.session = SparkJob.Cclass.session(this);
                this.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.session;
        }
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkSession session() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? session$lzycompute() : this.session;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkConf withExtraSparkConf(SparkConf sparkConf) {
        return SparkJob.Cclass.withExtraSparkConf(this, sparkConf);
    }

    @Override // ai.starlake.utils.SparkJob
    public void registerUdf(String str) {
        SparkJob.Cclass.registerUdf(this, str);
    }

    @Override // ai.starlake.utils.SparkJob
    public DataFrameWriter<Row> partitionedDatasetWriter(Dataset<Row> dataset, List<String> list) {
        return SparkJob.Cclass.partitionedDatasetWriter(this, dataset, list);
    }

    @Override // ai.starlake.utils.SparkJob
    public Dataset<Row> partitionDataset(Dataset<Row> dataset, List<String> list) {
        return SparkJob.Cclass.partitionDataset(this, dataset, list);
    }

    @Override // ai.starlake.utils.SparkJob
    public Object analyze(String str) {
        return SparkJob.Cclass.analyze(this, str);
    }

    @Override // ai.starlake.utils.SparkJob
    public void createSparkViews(Views views, Map<String, String> map, Map<String, String> map2) {
        SparkJob.Cclass.createSparkViews(this, views, map, map2);
    }

    @Override // ai.starlake.utils.SparkJob
    public Dataset<Row> createSparkView(SinkType sinkType, Option<String> option, String str) {
        return SparkJob.Cclass.createSparkView(this, sinkType, option, str);
    }

    @Override // ai.starlake.utils.JobBase
    public Tuple3<SinkType, Option<String>, String> parseViewDefinition(String str) {
        return JobBase.Cclass.parseViewDefinition(this, str);
    }

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        return DatasetLogging.Cclass.DatasetHelper(this, dataset);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    @Override // ai.starlake.utils.JobBase
    public Settings settings() {
        return this.settings;
    }

    public Either<Path, Dataset<Row>> path() {
        return this.path;
    }

    public String format() {
        return this.format;
    }

    public Option<Either<Path, Dataset<Row>>> dataset() {
        return this.dataset;
    }

    public Option<Domain> domain() {
        return this.domain;
    }

    public Option<Schema> schema() {
        return this.schema;
    }

    @Override // ai.starlake.utils.JobBase
    public String name() {
        return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Index ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{path()}));
    }

    public String getIndexName() {
        String s;
        Tuple2 tuple2 = new Tuple2(domain(), schema());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (some instanceof Some) {
                Domain domain = (Domain) some.x();
                if (some2 instanceof Some) {
                    s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{domain.getFinalName().toLowerCase(), ((Schema) some2.x()).getFinalName().toLowerCase()}));
                    return s;
                }
            }
        }
        s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.domain().toLowerCase(), this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.schema().toLowerCase()}));
        return s;
    }

    public String getResource() {
        return (String) this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.timestamp().map(new ESLoadJob$$anonfun$getResource$1(this)).getOrElse(new ESLoadJob$$anonfun$getResource$2(this));
    }

    @Override // ai.starlake.utils.JobBase
    public Try<JobResult> run() {
        Dataset dataset;
        Dataset load;
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Indexing resource {} with {}", new Object[]{getResource(), this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Left path = path();
        if (path instanceof Left) {
            Path path2 = (Path) path.a();
            String format = format();
            if ("json".equals(format)) {
                load = session().read().option("multiline", true).json(path2.toString());
            } else if ("json-array".equals(format)) {
                load = session().read().json(session().read().textFile(path2.toString()));
            } else if ("parquet".equals(format)) {
                load = session().read().format("parquet").load(path2.toString());
            } else {
                if (!"delta".equals(format)) {
                    throw new MatchError(format);
                }
                load = session().read().format("delta").load(path2.toString());
            }
            dataset = load;
        } else {
            if (!(path instanceof Right)) {
                throw new MatchError(path);
            }
            dataset = (Dataset) ((Right) path).b();
        }
        Dataset dataset2 = dataset;
        Dataset dataset3 = (Dataset) this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.getTimestampCol().map(new ESLoadJob$$anonfun$2(this, dataset2)).getOrElse(new ESLoadJob$$anonfun$3(this, dataset2));
        String str = (String) this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.mapping().map(new ESLoadJob$$anonfun$4(this)).getOrElse(new ESLoadJob$$anonfun$5(this, dataset3));
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Registering template {}_{} -> {}", new Object[]{this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.domain().toLowerCase(), this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.schema().toLowerCase(), str});
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Map<String, String> options = settings().comet().elasticsearch().options();
        String str2 = (String) options.getOrElse("es.nodes", new ESLoadJob$$anonfun$6(this));
        int i = new StringOps(Predef$.MODULE$.augmentString((String) options.getOrElse("es.port", new ESLoadJob$$anonfun$7(this)))).toInt();
        String str3 = new StringOps(Predef$.MODULE$.augmentString((String) options.getOrElse("es.net.ssl", new ESLoadJob$$anonfun$8(this)))).toBoolean() ? "https" : "http";
        Option option = options.get("net.http.auth.user");
        Option option2 = options.get("net.http.auth.password");
        CloseableHttpClient createDefault = HttpClients.createDefault();
        Option flatMap = option.flatMap(new ESLoadJob$$anonfun$9(this, option2));
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "://", ":", "/_template/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str3, str2, BoxesRunTime.boxToInteger(i), getIndexName()}));
        HttpDelete httpDelete = new HttpDelete(s);
        httpDelete.setHeader("Content-Type", "application/json");
        flatMap.foreach(new ESLoadJob$$anonfun$run$1(this, httpDelete));
        createDefault.execute(httpDelete);
        HttpPut httpPut = new HttpPut(s);
        httpPut.setEntity(new StringEntity(str, ContentType.APPLICATION_JSON));
        flatMap.foreach(new ESLoadJob$$anonfun$run$2(this, httpDelete));
        if (!RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(200), 299).contains(createDefault.execute(httpPut).getStatusLine().getStatusCode())) {
            throw new Exception("Failed to create template");
        }
        List list = (List) options.toList().$plus$plus(this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.options().$plus$plus(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Option[]{new Some(new Tuple2("es.resource.write", new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{getResource()})))), this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig.id().map(new ESLoadJob$$anonfun$10(this))})).flatten(new ESLoadJob$$anonfun$11(this)).toMap(Predef$.MODULE$.$conforms())).toList(), List$.MODULE$.canBuildFrom());
        if (!logger().underlying().isDebugEnabled()) {
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        } else if (logger().underlying().isDebugEnabled()) {
            logger().underlying().debug("sending {} documents to Elasticsearch using {}", new Object[]{BoxesRunTime.boxToLong(dataset3.count()), list});
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
        }
        dataset3.write().options(list.toMap(Predef$.MODULE$.$conforms())).format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).save(getResource());
        return new Success(new SparkJobResult(None$.MODULE$));
    }

    public ESLoadJob(ESLoadConfig eSLoadConfig, StorageHandler storageHandler, SchemaHandler schemaHandler, Settings settings) {
        this.ai$starlake$job$sink$es$ESLoadJob$$cliConfig = eSLoadConfig;
        this.ai$starlake$job$sink$es$ESLoadJob$$storageHandler = storageHandler;
        this.ai$starlake$job$sink$es$ESLoadJob$$schemaHandler = schemaHandler;
        this.settings = settings;
        StrictLogging.class.$init$(this);
        DatasetLogging.Cclass.$init$(this);
        JobBase.Cclass.$init$(this);
        SparkJob.Cclass.$init$(this);
        this.path = eSLoadConfig.getDataset(settings);
        this.format = eSLoadConfig.format();
        this.dataset = eSLoadConfig.dataset();
        this.domain = schemaHandler.getDomain(eSLoadConfig.domain());
        this.schema = domain().flatMap(new ESLoadJob$$anonfun$1(this));
    }
}
