/*
 * Decompiled with CFR 0.152.
 */
package io.treeverse.gc;

import io.treeverse.clients.APIConfigurations;
import io.treeverse.clients.ApiClient;
import io.treeverse.clients.ApiClient$;
import io.treeverse.clients.ConfigMapper;
import io.treeverse.clients.GarbageCollector$;
import io.treeverse.clients.HadoopUtils$;
import io.treeverse.clients.LakeFSContext$;
import io.treeverse.clients.StorageClientType$;
import io.treeverse.gc.APIUncommittedAddressLister;
import io.treeverse.gc.FailedRunException;
import io.treeverse.gc.NaiveCommittedAddressLister;
import io.treeverse.gc.NaiveDataLister;
import io.treeverse.gc.ParallelDataLister;
import io.treeverse.gc.ParameterValidationException;
import io.treeverse.gc.UncommittedGCRunInfo;
import java.net.URI;
import java.time.Clock;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.functions$;
import org.json4s.Formats;
import org.json4s.JsonAST;
import org.json4s.JsonDSL$;
import org.json4s.native.JsonMethods$;
import org.json4s.package$;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

public final class UncommittedGarbageCollector$ {
    public static UncommittedGarbageCollector$ MODULE$;
    private SparkSession spark;
    private final Seq<String> excludeFromOldData;
    private volatile boolean bitmap$0;

    static {
        new UncommittedGarbageCollector$();
    }

    public final String UNCOMMITTED_GC_SOURCE_NAME() {
        return "uncommitted_gc";
    }

    private final String DATA_PREFIX() {
        return "data/";
    }

    private SparkSession spark$lzycompute() {
        UncommittedGarbageCollector$ uncommittedGarbageCollector$ = this;
        synchronized (uncommittedGarbageCollector$) {
            if (!this.bitmap$0) {
                this.spark = SparkSession$.MODULE$.builder().appName("UncommittedGarbageCollector").getOrCreate();
                this.bitmap$0 = true;
            }
        }
        return this.spark;
    }

    public SparkSession spark() {
        return !this.bitmap$0 ? this.spark$lzycompute() : this.spark;
    }

    private Seq<String> excludeFromOldData() {
        return this.excludeFromOldData;
    }

    public Dataset<Row> listObjects(String storageNamespace, Date before) {
        SparkContext sc = this.spark().sparkContext();
        Path oldDataPath = new Path(storageNamespace);
        Path dataPath = new Path(storageNamespace, "data/");
        ConfigMapper configMapper = new ConfigMapper((Broadcast<Tuple2<String, String>[]>)sc.broadcast(HadoopUtils$.MODULE$.getHadoopConfigurationValues(sc.hadoopConfiguration(), (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"fs.", "lakefs."})), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Tuple2.class))));
        Dataset dataDF = new ParallelDataLister().listData(configMapper, dataPath);
        dataDF = dataDF.withColumn("address", functions$.MODULE$.concat((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.lit((Object)"data/"), functions$.MODULE$.col("base_address")})));
        Dataset oldDataDF = new NaiveDataLister().listData(configMapper, oldDataPath).withColumn("address", functions$.MODULE$.col("base_address")).filter(functions$.MODULE$.col("address").isin(this.excludeFromOldData()).unary_$bang());
        dataDF = dataDF.union(oldDataDF).filter(functions$.MODULE$.col("last_modified").$less((Object)BoxesRunTime.boxToLong((long)before.getTime())));
        return dataDF;
    }

    /*
     * WARNING - void declaration
     */
    public String getFirstSlice(Dataset<Row> dataDF, String repo) {
        void var3_3;
        block0: {
            String firstSlice = "";
            Dataset slices = dataDF.filter(functions$.MODULE$.col("address").startsWith("data/").$amp$amp((Object)functions$.MODULE$.col("base_address").startsWith(repo).unary_$bang()));
            if (slices.isEmpty()) break block0;
            firstSlice = ((String)((Row)slices.first()).getAs("base_address")).split("/")[0];
        }
        return var3_3;
    }

    public void validateRunModeConfigs(boolean shouldMark, boolean shouldSweep, String markID) {
        if (!shouldMark && !shouldSweep) {
            throw new ParameterValidationException("Nothing to do, must specify at least one of mark, sweep. Exiting...");
        }
        if (!shouldMark && markID.isEmpty()) {
            throw new ParameterValidationException(new StringBuilder(60).append("Please provide a mark ID (").append(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_MARK_ID()).append(") for sweep-only mode. Exiting...\n").toString());
        }
        if (shouldMark && new StringOps(Predef$.MODULE$.augmentString(markID)).nonEmpty()) {
            throw new ParameterValidationException("Can't provide mark ID for mark mode. Exiting...");
        }
    }

    public void main(String[] args) {
        String runID = "";
        String firstSlice = "";
        boolean success = false;
        Dataset<Row> markedAddresses = this.spark().emptyDataFrame().withColumn("address", functions$.MODULE$.lit((Object)""));
        Dataset addressesToDelete = this.spark().emptyDataFrame().withColumn("address", functions$.MODULE$.lit((Object)""));
        String repo = args[0];
        Configuration hc = this.spark().sparkContext().hadoopConfiguration();
        String apiURL = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_URL_KEY());
        String accessKey = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_ACCESS_KEY_KEY());
        String secretKey = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_SECRET_KEY_KEY());
        String connectionTimeout = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_CONNECTION_TIMEOUT_SEC_KEY());
        String readTimeout = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_READ_TIMEOUT_SEC_KEY());
        String minAgeStr = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_DEBUG_GC_UNCOMMITTED_MIN_AGE_SECONDS_KEY());
        int minAgeSeconds = minAgeStr != null && new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).nonEmpty() && new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).toInt() > 0 ? new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).toInt() : LakeFSContext$.MODULE$.DEFAULT_GC_UNCOMMITTED_MIN_AGE_SECONDS();
        Date cutoffTime = DateUtils.addSeconds((Date)new Date(), (int)(-minAgeSeconds));
        Instant startTime = Clock.systemUTC().instant();
        boolean shouldMark = hc.getBoolean(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_DO_MARK(), true);
        boolean shouldSweep = hc.getBoolean(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_DO_SWEEP(), true);
        String markID = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_MARK_ID(), "");
        this.validateRunModeConfigs(shouldMark, shouldSweep, markID);
        APIConfigurations apiConf = new APIConfigurations(apiURL, accessKey, secretKey, connectionTimeout, readTimeout, "uncommitted_gc");
        ApiClient apiClient = ApiClient$.MODULE$.get(apiConf);
        String storageType = apiClient.getBlockstoreType();
        String storageNamespace = apiClient.getStorageNamespace(repo, StorageClientType$.MODULE$.HadoopFS());
        if (!storageNamespace.endsWith("/")) {
            storageNamespace = new StringBuilder(1).append(storageNamespace).append("/").toString();
        }
        try {
            Object object;
            if (shouldMark) {
                Dataset dataset;
                Dataset<Row> dataDF = this.listObjects(storageNamespace, cutoffTime);
                firstSlice = this.getFirstSlice(dataDF, repo);
                UncommittedGCRunInfo uncommittedGCRunInfo = new APIUncommittedAddressLister(apiClient).listUncommittedAddresses(this.spark(), repo);
                String string = uncommittedGCRunInfo.uncommittedLocation();
                String string2 = "";
                if (string == null ? string2 != null : !string.equals(string2)) {
                    String uncommittedLocation = ApiClient$.MODULE$.translateURI(new URI(uncommittedGCRunInfo.uncommittedLocation()), storageType).toString();
                    dataset = this.spark().read().parquet(uncommittedLocation);
                } else {
                    dataset = this.spark().emptyDataFrame().withColumn("physical_address", functions$.MODULE$.lit((Object)""));
                }
                Dataset uncommittedDF = dataset;
                uncommittedDF = uncommittedDF.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{uncommittedDF.apply("physical_address").as("address")}));
                uncommittedDF = uncommittedDF.repartition((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{uncommittedDF.col("address")}));
                runID = uncommittedGCRunInfo.runID();
                String clientStorageNamespace = apiClient.getStorageNamespace(repo, StorageClientType$.MODULE$.SDKClient());
                Dataset<Row> committedDF = new NaiveCommittedAddressLister().listCommittedAddresses(this.spark(), storageNamespace, clientStorageNamespace);
                addressesToDelete = dataDF.select("address", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).repartition((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{dataDF.col("address")})).except(committedDF).except(uncommittedDF);
            }
            if (shouldSweep) {
                if (shouldMark) {
                    markedAddresses = addressesToDelete;
                    Predef$.MODULE$.println((Object)new StringBuilder(27).append("deleting marked addresses: ").append(runID).toString());
                } else {
                    markedAddresses = this.readMarkedAddresses(storageNamespace, markID);
                    Predef$.MODULE$.println((Object)new StringBuilder(27).append("deleting marked addresses: ").append(markID).toString());
                }
                String storageNSForSdkClient = GarbageCollector$.MODULE$.getStorageNSForSdkClient(apiClient, repo);
                String region = GarbageCollector$.MODULE$.getRegion(args);
                Broadcast hcValues = this.spark().sparkContext().broadcast(HadoopUtils$.MODULE$.getHadoopConfigurationValues(hc, (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"fs.", "lakefs."})), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Tuple2.class)));
                ConfigMapper configMapper = new ConfigMapper((Broadcast<Tuple2<String, String>[]>)hcValues);
                Dataset removed = GarbageCollector$.MODULE$.bulkRemove(configMapper, markedAddresses, storageNSForSdkClient, region, storageType).toDF();
                object = removed.collect();
            } else {
                object = BoxedUnit.UNIT;
            }
            success = true;
        }
        finally {
            if (new StringOps(Predef$.MODULE$.augmentString(runID)).nonEmpty() && shouldMark) {
                this.writeReports(storageNamespace, runID, firstSlice, startTime, cutoffTime.toInstant(), success, (Dataset<Row>)addressesToDelete);
            }
            this.spark().close();
        }
    }

    public void writeReports(String storageNamespace, String runID, String firstSlice, Instant startTime, Instant cutoffTime, boolean success, Dataset<Row> expiredAddresses) {
        String reportDst = this.formatRunPath(storageNamespace, runID);
        Predef$.MODULE$.println((Object)new StringBuilder(25).append("Report for mark_id=").append(runID).append(" path=").append(reportDst).toString());
        Dataset cachedAddresses = expiredAddresses.cache();
        cachedAddresses.write().parquet(new StringBuilder(8).append(reportDst).append("/deleted").toString());
        cachedAddresses.write().text(new StringBuilder(13).append(reportDst).append("/deleted.text").toString());
        String summary = this.writeJsonSummary(reportDst, runID, firstSlice, startTime, cutoffTime, success, expiredAddresses.count());
        Predef$.MODULE$.println((Object)new StringBuilder(15).append("Report summary=").append(summary).toString());
    }

    private String formatRunPath(String storageNamespace, String runID) {
        return new StringBuilder(33).append(storageNamespace).append("_lakefs/retention/gc/uncommitted/").append(runID).toString();
    }

    public Dataset<Row> readMarkedAddresses(String storageNamespace, String markID) {
        String reportPath = new StringBuilder(13).append(this.formatRunPath(storageNamespace, markID)).append("/summary.json").toString();
        FileSystem fs = FileSystem.get((Configuration)this.spark().sparkContext().hadoopConfiguration());
        if (!fs.exists(new Path(reportPath))) {
            throw new FailedRunException(new StringBuilder(25).append("Mark ID (").append(markID).append(") does not exist").toString());
        }
        Dataset markedRunSummary = this.spark().read().json(reportPath);
        if (!BoxesRunTime.unboxToBoolean((Object)((Row)markedRunSummary.first()).getAs("success"))) {
            throw new FailedRunException(new StringBuilder(35).append("Provided mark (").append(markID).append(") is of a failed run").toString());
        }
        return this.spark().read().parquet(new StringBuilder(8).append(reportPath).append("/deleted").toString());
    }

    public String writeJsonSummary(String dst, String runID, String firstSlice, Instant startTime, Instant cutoffTime, boolean success, long numDeletedObjects) {
        JsonAST.JObject jsonSummary;
        Path dstPath = new Path(new StringBuilder(13).append(dst).append("/summary.json").toString());
        FileSystem dstFS = dstPath.getFileSystem(this.spark().sparkContext().hadoopConfiguration());
        JsonAST.JObject x$1 = jsonSummary = package$.MODULE$.JObject().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"run_id"), (Object)JsonDSL$.MODULE$.string2jvalue(runID)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"success"), (Object)JsonDSL$.MODULE$.boolean2jvalue(success)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"first_slice"), (Object)JsonDSL$.MODULE$.string2jvalue(firstSlice)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"start_time"), (Object)JsonDSL$.MODULE$.string2jvalue(DateTimeFormatter.ISO_INSTANT.format(startTime))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cutoff_time"), (Object)JsonDSL$.MODULE$.string2jvalue(DateTimeFormatter.ISO_INSTANT.format(cutoffTime))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"num_deleted_objects"), (Object)JsonDSL$.MODULE$.long2jvalue(numDeletedObjects))}));
        Formats x$2 = JsonMethods$.MODULE$.render$default$2((JsonAST.JValue)x$1);
        String summary = JsonMethods$.MODULE$.compact(JsonMethods$.MODULE$.render((JsonAST.JValue)x$1, x$2));
        try (FSDataOutputStream stream = dstFS.create(dstPath);){
            stream.writeBytes(summary);
        }
        return summary;
    }

    private UncommittedGarbageCollector$() {
        MODULE$ = this;
        this.excludeFromOldData = (Seq)new .colon.colon((Object)"dummy", (List)Nil$.MODULE$);
    }
}

