/*
 * Decompiled with CFR 0.152.
 */
package io.treeverse.gc;

import io.treeverse.clients.APIConfigurations;
import io.treeverse.clients.ApiClient;
import io.treeverse.clients.ApiClient$;
import io.treeverse.clients.BulkRemover;
import io.treeverse.clients.BulkRemoverFactory$;
import io.treeverse.clients.ConfigMapper;
import io.treeverse.clients.GarbageCollector$;
import io.treeverse.clients.HadoopUtils$;
import io.treeverse.clients.LakeFSContext$;
import io.treeverse.clients.StorageClient;
import io.treeverse.clients.StorageClientType$;
import io.treeverse.clients.StorageClients$;
import io.treeverse.gc.APIUncommittedAddressLister;
import io.treeverse.gc.ActiveCommitsAddressLister;
import io.treeverse.gc.CommittedAddressLister;
import io.treeverse.gc.FailedRunException;
import io.treeverse.gc.NaiveCommittedAddressLister;
import io.treeverse.gc.NaiveDataLister;
import io.treeverse.gc.ParallelDataLister;
import io.treeverse.gc.ParameterValidationException;
import io.treeverse.gc.UncommittedGCRunInfo;
import java.io.Serializable;
import java.net.URI;
import java.time.Clock;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.functions$;
import org.json4s.Formats;
import org.json4s.JsonAST;
import org.json4s.JsonDSL$;
import org.json4s.native.JsonMethods$;
import org.json4s.package$;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

public final class GarbageCollection$ {
    public static GarbageCollection$ MODULE$;
    private SparkSession spark;
    private final Seq<String> excludeFromOldData;
    private volatile boolean bitmap$0;

    static {
        new GarbageCollection$();
    }

    public final String UNIFIED_GC_SOURCE_NAME() {
        return "unified_gc";
    }

    private final String DATA_PREFIX() {
        return "data/";
    }

    private SparkSession spark$lzycompute() {
        GarbageCollection$ garbageCollection$ = this;
        synchronized (garbageCollection$) {
            if (!this.bitmap$0) {
                this.spark = SparkSession$.MODULE$.builder().appName("GarbageCollection").getOrCreate();
                this.bitmap$0 = true;
            }
        }
        return this.spark;
    }

    public SparkSession spark() {
        return !this.bitmap$0 ? this.spark$lzycompute() : this.spark;
    }

    private Seq<String> excludeFromOldData() {
        return this.excludeFromOldData;
    }

    public Dataset<Row> listObjects(String storageNamespace, Date before) {
        SparkContext sc = this.spark().sparkContext();
        Path oldDataPath = new Path(storageNamespace);
        Path dataPath = new Path(storageNamespace, "data/");
        ConfigMapper configMapper = new ConfigMapper((Broadcast<Tuple2<String, String>[]>)sc.broadcast(HadoopUtils$.MODULE$.getHadoopConfigurationValues(sc.hadoopConfiguration(), (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"fs.", "lakefs."})), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Tuple2.class))));
        Dataset dataDF = new ParallelDataLister().listData(configMapper, dataPath);
        dataDF = dataDF.withColumn("address", functions$.MODULE$.concat((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.lit((Object)"data/"), functions$.MODULE$.col("base_address")})));
        Dataset oldDataDF = new NaiveDataLister().listData(configMapper, oldDataPath).withColumn("address", functions$.MODULE$.col("base_address")).filter(functions$.MODULE$.col("address").isin(this.excludeFromOldData()).unary_$bang());
        dataDF = dataDF.union(oldDataDF).filter(functions$.MODULE$.col("last_modified").$less((Object)BoxesRunTime.boxToLong((long)before.getTime())));
        return dataDF;
    }

    /*
     * WARNING - void declaration
     */
    public String getFirstSlice(Dataset<Row> dataDF, String repo) {
        void var3_3;
        block0: {
            String firstSlice = "";
            Dataset slices = dataDF.filter(functions$.MODULE$.col("address").startsWith("data/").$amp$amp((Object)functions$.MODULE$.col("base_address").startsWith(repo).unary_$bang()));
            if (slices.isEmpty()) break block0;
            firstSlice = ((String)((Row)slices.first()).getAs("base_address")).split("/")[0];
        }
        return var3_3;
    }

    public void validateRunModeConfigs(boolean shouldMark, boolean shouldSweep, String markID) {
        if (!shouldMark && !shouldSweep) {
            throw new ParameterValidationException("Nothing to do, must specify at least one of mark, sweep. Exiting...");
        }
        if (!shouldMark && markID.isEmpty()) {
            throw new ParameterValidationException(new StringBuilder(60).append("Please provide a mark ID (").append(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_MARK_ID()).append(") for sweep-only mode. Exiting...\n").toString());
        }
        if (shouldMark && new StringOps(Predef$.MODULE$.augmentString(markID)).nonEmpty()) {
            throw new ParameterValidationException("Can't provide mark ID for mark mode. Exiting...");
        }
    }

    public void main(String[] args) {
        Configuration hc = this.spark().sparkContext().hadoopConfiguration();
        String region = args.length == 2 ? args[1] : "";
        String repo = args[0];
        this.run(region, repo, this.run$default$3(), this.run$default$4(), this.run$default$5());
    }

    public void run(String region, String repo, boolean uncommittedOnly, String sourceName, String outputPrefix) {
        String runID = "";
        String firstSlice = "";
        boolean success = false;
        Dataset addressesToDelete = this.spark().emptyDataFrame().withColumn("address", functions$.MODULE$.lit((Object)""));
        Configuration hc = this.spark().sparkContext().hadoopConfiguration();
        String apiURL = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_URL_KEY());
        String accessKey = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_ACCESS_KEY_KEY());
        String secretKey = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_SECRET_KEY_KEY());
        String connectionTimeout = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_CONNECTION_TIMEOUT_SEC_KEY());
        String readTimeout = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_API_READ_TIMEOUT_SEC_KEY());
        String minAgeStr = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_DEBUG_GC_UNCOMMITTED_MIN_AGE_SECONDS_KEY());
        int minAgeSeconds = minAgeStr != null && new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).nonEmpty() && new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).toInt() > 0 ? new StringOps(Predef$.MODULE$.augmentString(minAgeStr)).toInt() : LakeFSContext$.MODULE$.DEFAULT_GC_UNCOMMITTED_MIN_AGE_SECONDS();
        Date cutoffTime = DateUtils.addSeconds((Date)new Date(), (int)(-minAgeSeconds));
        Instant startTime = Clock.systemUTC().instant();
        boolean shouldMark = hc.getBoolean(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_DO_MARK(), true);
        boolean shouldSweep = hc.getBoolean(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_DO_SWEEP(), true);
        String markID = hc.get(LakeFSContext$.MODULE$.LAKEFS_CONF_GC_MARK_ID(), "");
        this.validateRunModeConfigs(shouldMark, shouldSweep, markID);
        APIConfigurations apiConf = new APIConfigurations(apiURL, accessKey, secretKey, connectionTimeout, readTimeout, sourceName);
        ApiClient apiClient = ApiClient$.MODULE$.get(apiConf);
        String storageType = apiClient.getBlockstoreType();
        String storageNamespace = apiClient.getStorageNamespace(repo, StorageClientType$.MODULE$.HadoopFS());
        if (!storageNamespace.endsWith("/")) {
            storageNamespace = new StringBuilder(1).append(storageNamespace).append("/").toString();
        }
        try {
            BoxedUnit boxedUnit;
            if (shouldMark) {
                URI uncommittedLocation;
                Path uncommittedPath;
                FileSystem fs;
                Dataset<Row> dataDF = this.listObjects(storageNamespace, cutoffTime);
                firstSlice = this.getFirstSlice(dataDF, repo);
                UncommittedGCRunInfo uncommittedGCRunInfo = new APIUncommittedAddressLister(apiClient).listUncommittedAddresses(this.spark(), repo);
                Dataset uncommittedDF = this.spark().emptyDataFrame().withColumn("physical_address", functions$.MODULE$.lit((Object)""));
                String string = uncommittedGCRunInfo.uncommittedLocation();
                String string2 = "";
                if ((string == null ? string2 != null : !string.equals(string2)) && (fs = (uncommittedPath = new Path(uncommittedLocation = ApiClient$.MODULE$.translateURI(new URI(uncommittedGCRunInfo.uncommittedLocation()), storageType))).getFileSystem(hc)).exists(uncommittedPath)) {
                    uncommittedDF = this.spark().read().parquet(uncommittedLocation.toString());
                }
                uncommittedDF = uncommittedDF.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{uncommittedDF.apply("physical_address").as("address")}));
                uncommittedDF = uncommittedDF.repartition((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{uncommittedDF.col("address")}));
                runID = uncommittedGCRunInfo.runID();
                String clientStorageNamespace = apiClient.getStorageNamespace(repo, StorageClientType$.MODULE$.SDKClient());
                CommittedAddressLister committedLister = uncommittedOnly ? new NaiveCommittedAddressLister() : new ActiveCommitsAddressLister(apiClient, repo, storageType);
                Dataset<Row> committedDF = committedLister.listCommittedAddresses(this.spark(), storageNamespace, clientStorageNamespace);
                addressesToDelete = dataDF.select("address", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).repartition((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{dataDF.col("address")})).except(committedDF).except(uncommittedDF).cache();
                committedDF.unpersist();
                boxedUnit = uncommittedDF.unpersist();
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            if (shouldSweep) {
                Dataset dataset;
                if (shouldMark) {
                    Predef$.MODULE$.println((Object)new StringBuilder(39).append("deleting marked addresses from run ID: ").append(runID).toString());
                    dataset = addressesToDelete;
                } else {
                    Predef$.MODULE$.println((Object)new StringBuilder(40).append("deleting marked addresses from mark ID: ").append(markID).toString());
                    dataset = this.readMarkedAddresses(storageNamespace, markID, outputPrefix);
                }
                Dataset markedAddresses = dataset;
                String storageNSForSdkClient = GarbageCollector$.MODULE$.getStorageNSForSdkClient(apiClient, repo);
                Broadcast hcValues = this.spark().sparkContext().broadcast(HadoopUtils$.MODULE$.getHadoopConfigurationValues(hc, (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"fs.", "lakefs."})), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Tuple2.class)));
                ConfigMapper configMapper = new ConfigMapper((Broadcast<Tuple2<String, String>[]>)hcValues);
                this.bulkRemove(configMapper, markedAddresses, storageNSForSdkClient, region, storageType);
            }
            success = true;
        }
        catch (Throwable throwable) {
            if (new StringOps(Predef$.MODULE$.augmentString(runID)).nonEmpty() && shouldMark) {
                this.writeReports(storageNamespace, runID, firstSlice, startTime, cutoffTime.toInstant(), success, (Dataset<Row>)addressesToDelete, outputPrefix);
            }
            this.spark().close();
            throw throwable;
        }
        if (new StringOps(Predef$.MODULE$.augmentString(runID)).nonEmpty() && shouldMark) {
            this.writeReports(storageNamespace, runID, firstSlice, startTime, cutoffTime.toInstant(), success, addressesToDelete, outputPrefix);
        }
        this.spark().close();
    }

    public boolean run$default$3() {
        return false;
    }

    public String run$default$4() {
        return "unified_gc";
    }

    public String run$default$5() {
        return "unified";
    }

    public void bulkRemove(ConfigMapper configMapper, Dataset<Row> readKeysDF, String storageNamespace, String region, String storageType) {
        java.util.Iterator it = readKeysDF.select("address", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.getString(0), this.spark().implicits().newStringEncoder()).toLocalIterator();
        while (it.hasNext()) {
            StorageClient storageClient = StorageClients$.MODULE$.apply(storageType, configMapper, storageNamespace, region);
            BulkRemover bulkRemover = BulkRemoverFactory$.MODULE$.apply(storageClient, storageNamespace);
            int chunkSize = bulkRemover.getMaxBulkSize();
            Seq chunk = ((Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter(it).asScala()).take(chunkSize).toSeq();
            bulkRemover.deleteObjects((Seq<String>)chunk, storageNamespace);
        }
    }

    public void writeReports(String storageNamespace, String runID, String firstSlice, Instant startTime, Instant cutoffTime, boolean success, Dataset<Row> expiredAddresses, String outputPrefix) {
        String reportDst = this.formatRunPath(storageNamespace, runID, outputPrefix);
        Predef$.MODULE$.println((Object)new StringBuilder(25).append("Report for mark_id=").append(runID).append(" path=").append(reportDst).toString());
        expiredAddresses.write().parquet(new StringBuilder(8).append(reportDst).append("/deleted").toString());
        expiredAddresses.write().text(new StringBuilder(13).append(reportDst).append("/deleted.text").toString());
        String summary = this.writeJsonSummary(reportDst, runID, firstSlice, startTime, cutoffTime, success, expiredAddresses.count());
        Predef$.MODULE$.println((Object)new StringBuilder(15).append("Report summary=").append(summary).toString());
    }

    public String writeReports$default$8() {
        return "unified";
    }

    private String formatRunPath(String storageNamespace, String runID, String outputPrefix) {
        return new StringBuilder(22).append(storageNamespace).append("_lakefs/retention/gc/").append(outputPrefix).append("/").append(runID).toString();
    }

    public Dataset<Row> readMarkedAddresses(String storageNamespace, String markID, String outputPrefix) {
        Dataset dataset;
        Path reportPath = new Path(new StringBuilder(13).append(this.formatRunPath(storageNamespace, markID, outputPrefix)).append("/summary.json").toString());
        FileSystem fs = reportPath.getFileSystem(this.spark().sparkContext().hadoopConfiguration());
        if (!fs.exists(reportPath)) {
            throw new FailedRunException(new StringBuilder(25).append("Mark ID (").append(markID).append(") does not exist").toString());
        }
        Dataset markedRunSummary = this.spark().read().json(reportPath.toString());
        if (!BoxesRunTime.unboxToBoolean((Object)((Row)markedRunSummary.first()).getAs("success"))) {
            throw new FailedRunException(new StringBuilder(35).append("Provided mark (").append(markID).append(") is of a failed run").toString());
        }
        Path deletedPath = new Path(new StringBuilder(8).append(this.formatRunPath(storageNamespace, markID, outputPrefix)).append("/deleted").toString());
        if (!fs.exists(deletedPath)) {
            Predef$.MODULE$.println((Object)new StringBuilder(41).append("Mark ID (").append(markID).append(") does not contain deleted files").toString());
            dataset = this.spark().emptyDataFrame().withColumn("address", functions$.MODULE$.lit((Object)""));
        } else {
            dataset = this.spark().read().parquet(deletedPath.toString());
        }
        return dataset;
    }

    public String readMarkedAddresses$default$3() {
        return "unified";
    }

    public String writeJsonSummary(String dst, String runID, String firstSlice, Instant startTime, Instant cutoffTime, boolean success, long numDeletedObjects) {
        JsonAST.JObject jsonSummary;
        Path dstPath = new Path(new StringBuilder(13).append(dst).append("/summary.json").toString());
        FileSystem dstFS = dstPath.getFileSystem(this.spark().sparkContext().hadoopConfiguration());
        JsonAST.JObject x$1 = jsonSummary = package$.MODULE$.JObject().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"run_id"), (Object)JsonDSL$.MODULE$.string2jvalue(runID)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"success"), (Object)JsonDSL$.MODULE$.boolean2jvalue(success)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"first_slice"), (Object)JsonDSL$.MODULE$.string2jvalue(firstSlice)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"start_time"), (Object)JsonDSL$.MODULE$.string2jvalue(DateTimeFormatter.ISO_INSTANT.format(startTime))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cutoff_time"), (Object)JsonDSL$.MODULE$.string2jvalue(DateTimeFormatter.ISO_INSTANT.format(cutoffTime))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"num_deleted_objects"), (Object)JsonDSL$.MODULE$.long2jvalue(numDeletedObjects))}));
        Formats x$2 = JsonMethods$.MODULE$.render$default$2((JsonAST.JValue)x$1);
        String summary = JsonMethods$.MODULE$.compact(JsonMethods$.MODULE$.render((JsonAST.JValue)x$1, x$2));
        try (FSDataOutputStream stream = dstFS.create(dstPath);){
            stream.writeBytes(summary);
        }
        return summary;
    }

    private GarbageCollection$() {
        MODULE$ = this;
        this.excludeFromOldData = (Seq)new .colon.colon((Object)"dummy", (List)Nil$.MODULE$);
    }
}

