/*
 * Decompiled with CFR 0.152.
 */
package io.treeverse.gc;

import io.treeverse.clients.APIConfigurations;
import io.treeverse.clients.ApiClient;
import io.treeverse.clients.ApiClient$;
import io.treeverse.clients.ConfigMapper;
import io.treeverse.clients.HadoopUtils$;
import io.treeverse.clients.LakeFSContext$;
import io.treeverse.clients.StorageClientType$;
import io.treeverse.gc.APIUncommittedAddressLister;
import io.treeverse.gc.NaiveCommittedAddressLister;
import io.treeverse.gc.NaiveDataLister;
import io.treeverse.gc.ParallelDataLister;
import io.treeverse.gc.UncommittedGCRunInfo;
import java.time.Clock;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.functions$;
import org.json4s.Formats;
import org.json4s.JsonAST;
import org.json4s.JsonDSL$;
import org.json4s.native.JsonMethods$;
import org.json4s.package$;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

public final class UncommittedGarbageCollector$ {
    public static UncommittedGarbageCollector$ MODULE$;
    private SparkSession spark;
    private volatile boolean bitmap$0;

    static {
        new UncommittedGarbageCollector$();
    }

    public final String UNCOMMITTED_GC_SOURCE_NAME() {
        return "uncommitted_gc";
    }

    private SparkSession spark$lzycompute() {
        UncommittedGarbageCollector$ uncommittedGarbageCollector$ = this;
        synchronized (uncommittedGarbageCollector$) {
            if (!this.bitmap$0) {
                this.spark = SparkSession$.MODULE$.builder().appName("UncommittedGarbageCollector").getOrCreate();
                this.bitmap$0 = true;
            }
        }
        return this.spark;
    }

    public SparkSession spark() {
        return !this.bitmap$0 ? this.spark$lzycompute() : this.spark;
    }

    public Dataset<Row> listObjects(String storageNamespace, Date before) {
        SparkContext sc = this.spark().sparkContext();
        Path oldDataPath = new Path(storageNamespace);
        String dataPrefix = "data";
        Path dataPath = new Path(storageNamespace, dataPrefix);
        ConfigMapper configMapper = new ConfigMapper((Broadcast<Tuple2<String, String>[]>)sc.broadcast(HadoopUtils$.MODULE$.getHadoopConfigurationValues(sc.hadoopConfiguration(), (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"fs.", "lakefs."})), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Tuple2.class))));
        Dataset dataDF = new ParallelDataLister().listData(configMapper, dataPath);
        dataDF = dataDF.withColumn("address", functions$.MODULE$.concat((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.lit((Object)dataPrefix), functions$.MODULE$.lit((Object)"/"), functions$.MODULE$.col("base_address")}))).select("address", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"last_modified"}));
        Dataset<Row> oldDataDF = new NaiveDataLister().listData(configMapper, oldDataPath);
        dataDF = dataDF.union(oldDataDF).filter(functions$.MODULE$.col("last_modified").$less((Object)BoxesRunTime.boxToLong((long)before.getTime()))).select("address", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0]));
        return dataDF;
    }

    public void main(String[] args) {
        String runID = "";
        String firstSlice = "";
        boolean success = true;
        Dataset addressesToDelete = this.spark().emptyDataFrame().withColumn("address", functions$.MODULE$.lit((Object)""));
        String repo = args[0];
        Configuration hc = this.spark().sparkContext().hadoopConfiguration();
        String apiURL = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_URL_KEY());
        String accessKey = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_ACCESS_KEY_KEY());
        String secretKey = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_SECRET_KEY_KEY());
        String connectionTimeout = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_CONNECTION_TIMEOUT_SEC_KEY());
        String readTimeout = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_READ_TIMEOUT_SEC_KEY());
        String minAgeStr = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_DEBUG_GC_UNCOMMITTED_MIN_AGE_SECONDS_KEY());
        int minAgeSeconds = minAgeStr != null && new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).nonEmpty() && new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).toInt() > 0 ? new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).toInt() : LakeFSContext$.MODULE$.DEFAULT_GC_UNCOMMITTED_MIN_AGE_SECONDS();
        Date cutoffTime = DateUtils.addSeconds((Date)new Date(), (int)(-minAgeSeconds));
        Instant startTime = Clock.systemUTC().instant();
        APIConfigurations apiConf = new APIConfigurations(apiURL, accessKey, secretKey, connectionTimeout, readTimeout, "uncommitted_gc");
        ApiClient apiClient = ApiClient$.MODULE$.get(apiConf);
        String storageNamespace = apiClient.getStorageNamespace(repo, StorageClientType$.MODULE$.HadoopFS());
        if (!storageNamespace.endsWith("/")) {
            storageNamespace = new StringBuilder(1).append(storageNamespace).append("/").toString();
        }
        try {
            try {
                Dataset<Row> dataDF = this.listObjects(storageNamespace, cutoffTime);
                UncommittedGCRunInfo uncommittedGCRunInfo = new APIUncommittedAddressLister(apiClient).listUncommittedAddresses(this.spark(), repo);
                String string = uncommittedGCRunInfo.uncommittedLocation();
                String string2 = "";
                Dataset uncommittedDF = (string == null ? string2 != null : !string.equals(string2)) ? this.spark().read().parquet(uncommittedGCRunInfo.uncommittedLocation()) : this.spark().emptyDataFrame().withColumn("physical_address", functions$.MODULE$.lit((Object)""));
                uncommittedDF = uncommittedDF.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{uncommittedDF.apply("physical_address").as("address")}));
                runID = uncommittedGCRunInfo.runID();
                Dataset<Row> committedDF = new NaiveCommittedAddressLister().listCommittedAddresses(this.spark(), storageNamespace);
                addressesToDelete = dataDF.except(committedDF).except(uncommittedDF);
                String firstFile = ((Row)dataDF.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("address")})).first()).toString();
                firstSlice = firstFile.substring(0, firstFile.lastIndexOf("/"));
            }
            catch (Throwable e) {
                success = false;
                throw e;
            }
        }
        finally {
            this.writeReports(storageNamespace, runID, firstSlice, startTime, success, (Dataset<Row>)addressesToDelete);
            this.spark().close();
        }
    }

    public void writeReports(String storageNamespace, String runID, String firstSlice, Instant startTime, boolean success, Dataset<Row> removed) {
        String reportDst = new StringBuilder(33).append(storageNamespace).append("_lakefs/retention/gc/uncommitted/").append(runID).toString();
        this.writeJsonSummary(reportDst, runID, firstSlice, startTime, success, removed.count());
        removed.write().parquet(new StringBuilder(8).append(reportDst).append("/deleted").toString());
    }

    private void writeJsonSummary(String dst, String runID, String firstSlice, Instant startTime, boolean success, long numDeletedObjects) {
        Path dstPath = new Path(new StringBuilder(13).append(dst).append("/summary.json").toString());
        FileSystem dstFS = dstPath.getFileSystem(this.spark().sparkContext().hadoopConfiguration());
        JsonAST.JObject jsonSummary = package$.MODULE$.JObject().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"run_id"), (Object)JsonDSL$.MODULE$.string2jvalue(runID)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"success"), (Object)JsonDSL$.MODULE$.boolean2jvalue(success)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"first_slice"), (Object)JsonDSL$.MODULE$.string2jvalue(firstSlice)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"start_time"), (Object)JsonDSL$.MODULE$.string2jvalue(DateTimeFormatter.ISO_INSTANT.format(startTime))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"num_deleted_objects"), (Object)JsonDSL$.MODULE$.long2jvalue(numDeletedObjects))}));
        try (FSDataOutputStream stream = dstFS.create(dstPath);){
            JsonAST.JObject x$1 = jsonSummary;
            Formats x$2 = JsonMethods$.MODULE$.render$default$2((JsonAST.JValue)x$1);
            stream.writeBytes(JsonMethods$.MODULE$.compact(JsonMethods$.MODULE$.render((JsonAST.JValue)x$1, x$2)));
        }
    }

    private UncommittedGarbageCollector$() {
        MODULE$ = this;
    }
}

