/*
 * Decompiled with CFR 0.152.
 */
package io.treeverse.clients;

import io.treeverse.clients.ApiClient;
import io.treeverse.clients.ExportStatus;
import io.treeverse.clients.Exporter$;
import io.treeverse.clients.Handler;
import io.treeverse.clients.KeyFilter;
import io.treeverse.clients.LakeFSContext$;
import io.treeverse.clients.LakeFSJobParams$;
import io.treeverse.clients.SparkFilter;
import io.treeverse.clients.StorageClientType$;
import io.treeverse.clients.URLResolver$;
import java.io.Serializable;
import java.net.URI;
import java.time.Clock;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.spark.SerializableWritable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.immutable.Stream;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.NonLocalReturnControl;
import scala.runtime.RichChar$;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;
import scala.util.Random$;

@ScalaSignature(bytes="\u0006\u0001\u0005ed\u0001B\u000f\u001f\u0001\u0015B\u0001\u0002\f\u0001\u0003\u0002\u0003\u0006I!\f\u0005\tq\u0001\u0011\t\u0011)A\u0005s!AQ\b\u0001B\u0001B\u0003%a\b\u0003\u0005B\u0001\t\u0005\t\u0015!\u0003C\u0011!i\u0005A!A!\u0002\u0013\u0011\u0005\u0002\u0003(\u0001\u0005\u0003\u0005\u000b\u0011B(\t\u000bI\u0003A\u0011A*\t\u000bI\u0003A\u0011A.\t\u000b\u0001\u0004A\u0011A1\t\u000b\u001d\u0004A\u0011\u00015\t\u000f-\u0004!\u0019!C\u0007Y\"1q\u000e\u0001Q\u0001\u000e5DQ\u0001\u001d\u0001\u0005\nEDq!a\u0007\u0001\t\u0003\ti\u0002C\u0005\u0002&\u0001\u0011\r\u0011\"\u0004\u0002(!A\u0011q\u0006\u0001!\u0002\u001b\tI\u0003C\u0004\u00022\u0001!I!a\r\t\u000f\u0005}\u0002\u0001\"\u0003\u0002B!I\u0011Q\n\u0001C\u0002\u00135\u0011q\n\u0005\t\u0003+\u0002\u0001\u0015!\u0004\u0002R!9\u0011q\u000b\u0001\u0005\n\u0005esaBA.=!\u0005\u0011Q\f\u0004\u0007;yA\t!a\u0018\t\rI;B\u0011AA1\u0011%\t\u0019g\u0006b\u0001\n\u000b\t)\u0007\u0003\u0005\u0002l]\u0001\u000bQBA4\u0011%\tig\u0006b\u0001\n\u000b\ty\u0007\u0003\u0005\u0002x]\u0001\u000bQBA9\u0005!)\u0005\u0010]8si\u0016\u0014(BA\u0010!\u0003\u001d\u0019G.[3oiNT!!\t\u0012\u0002\u0013Q\u0014X-\u001a<feN,'\"A\u0012\u0002\u0005%|7\u0001A\n\u0003\u0001\u0019\u0002\"a\n\u0016\u000e\u0003!R\u0011!K\u0001\u0006g\u000e\fG.Y\u0005\u0003W!\u0012a!\u00118z%\u00164\u0017!B:qCJ\\\u0007C\u0001\u00187\u001b\u0005y#B\u0001\u00192\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003YIR!a\r\u001b\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005)\u0014aA8sO&\u0011qg\f\u0002\r'B\f'o[*fgNLwN\\\u0001\nCBL7\t\\5f]R\u0004\"AO\u001e\u000e\u0003yI!\u0001\u0010\u0010\u0003\u0013\u0005\u0003\u0018n\u00117jK:$\u0018A\u00024jYR,'\u000f\u0005\u0002;\u007f%\u0011\u0001I\b\u0002\n\u0017\u0016Lh)\u001b7uKJ\f\u0001B]3q_:\u000bW.\u001a\t\u0003\u0007*s!\u0001\u0012%\u0011\u0005\u0015CS\"\u0001$\u000b\u0005\u001d#\u0013A\u0002\u001fs_>$h(\u0003\u0002JQ\u00051\u0001K]3eK\u001aL!a\u0013'\u0003\rM#(/\u001b8h\u0015\tI\u0005&A\u0004egR\u0014vn\u001c;\u0002\u0017A\f'/\u00197mK2L7/\u001c\t\u0003OAK!!\u0015\u0015\u0003\u0007%sG/\u0001\u0004=S:LGO\u0010\u000b\b)V3v\u000bW-[!\tQ\u0004\u0001C\u0003-\u000f\u0001\u0007Q\u0006C\u00039\u000f\u0001\u0007\u0011\bC\u0003>\u000f\u0001\u0007a\bC\u0003B\u000f\u0001\u0007!\tC\u0003N\u000f\u0001\u0007!\tC\u0003O\u000f\u0001\u0007q\nF\u0003U9vsv\fC\u0003-\u0011\u0001\u0007Q\u0006C\u00039\u0011\u0001\u0007\u0011\bC\u0003B\u0011\u0001\u0007!\tC\u0003N\u0011\u0001\u0007!)A\nfqB|'\u000f^!mY\u001a\u0013x.\u001c\"sC:\u001c\u0007\u000e\u0006\u0002cKB\u0011qeY\u0005\u0003I\"\u0012A!\u00168ji\")a-\u0003a\u0001\u0005\u00061!M]1oG\"\f1#\u001a=q_J$\u0018\t\u001c7Ge>l7i\\7nSR$\"AY5\t\u000b)T\u0001\u0019\u0001\"\u0002\u0011\r|W.\\5u\u0013\u0012\u000bq\"\\1y\u0019><w-\u001a3FeJ|'o]\u000b\u0002[>\ta.\b\u0002(!\u0005\u0001R.\u0019=M_\u001e<W\rZ#se>\u00148\u000fI\u0001\u0007Kb\u0004xN\u001d;\u0015\rI,x/_>}!\t93/\u0003\u0002uQ\t9!i\\8mK\u0006t\u0007\"\u0002<\u000e\u0001\u0004y\u0015!\u0002:pk:$\u0007\"\u0002=\u000e\u0001\u0004\u0011\u0015A\u00018t\u0011\u0015QX\u00021\u0001C\u0003\r\u0011X\r\u001c\u0005\u0006U6\u0001\rA\u0011\u0005\u0006{6\u0001\rA`\u0001\nC\u000e$\u0018n\u001c8t\t\u001a\u00032a`A\u000b\u001d\u0011\t\t!!\u0005\u000f\t\u0005\r\u0011q\u0002\b\u0005\u0003\u000b\tiA\u0004\u0003\u0002\b\u0005-abA#\u0002\n%\tQ'\u0003\u00024i%\u0011AFM\u0005\u0003aEJ1!a\u00050\u0003\u001d\u0001\u0018mY6bO\u0016LA!a\u0006\u0002\u001a\tIA)\u0019;b\rJ\fW.\u001a\u0006\u0004\u0003'y\u0013AC3ya>\u0014HO\u0012:p[R)!-a\b\u0002\"!)aM\u0004a\u0001\u0005\"1\u00111\u0005\bA\u0002\t\u000bA\u0002\u001d:fm\u000e{W.\\5u\u0013\u0012\u000b!b];dG\u0016\u001c8/T:h+\t\tIc\u0004\u0002\u0002,\u0005\u0012\u0011QF\u0001\u001f\u000bb\u0004xN\u001d;!G>l\u0007\u000f\\3uK\u0012\u00043/^2dKN\u001ch-\u001e7ms\u0006\n1b];dG\u0016\u001c8/T:hA\u0005a\u0011m\u0019;P]\u0006\u001bG/[8ogRI!-!\u000e\u00028\u0005m\u0012Q\b\u0005\u0006qF\u0001\rA\u0011\u0005\u0007\u0003s\t\u0002\u0019\u0001\"\u0002\u0007\u0011\u001cH\u000fC\u0003k#\u0001\u0007!\tC\u0003~#\u0001\u0007a0\u0001\txe&$XmU;n[\u0006\u0014\u0018PR5mKR9!-a\u0011\u0002H\u0005%\u0003BBA#%\u0001\u0007!/A\u0004tk\u000e\u001cWm]:\t\u000b)\u0014\u0002\u0019\u0001\"\t\r\u0005-#\u00031\u0001C\u0003\u001d\u0019wN\u001c;f]R\f\u0011\u0002\u001d:fM&DH*\u001a8\u0016\u0005\u0005EsBAA*;\u0005A\u0011A\u00039sK\u001aL\u0007\u0010T3oA\u0005Q!/\u00198e!J,g-\u001b=\u0015\u0003\t\u000b\u0001\"\u0012=q_J$XM\u001d\t\u0003u]\u0019\"a\u0006\u0014\u0015\u0005\u0005u\u0013A\u00053fM\u0006,H\u000e\u001e)be\u0006dG.\u001a7jg6,\"!a\u001a\u0010\u0005\u0005%T$\u0001\u0006\u0002'\u0011,g-Y;miB\u000b'/\u00197mK2L7/\u001c\u0011\u0002)\u0015C\u0006k\u0014*U\u000bJ{6kT+S\u0007\u0016{f*Q'F+\t\t\th\u0004\u0002\u0002t\u0005\u0012\u0011QO\u0001\tKb\u0004xN\u001d;fe\u0006)R\t\u0017)P%R+%kX*P+J\u001bUi\u0018(B\u001b\u0016\u0003\u0003")
public class Exporter {
    private final SparkSession spark;
    private final ApiClient apiClient;
    private final KeyFilter filter;
    private final String repoName;
    private final String dstRoot;
    private final int parallelism;

    public static String EXPORTER_SOURCE_NAME() {
        return Exporter$.MODULE$.EXPORTER_SOURCE_NAME();
    }

    public static int defaultParallelism() {
        return Exporter$.MODULE$.defaultParallelism();
    }

    public void exportAllFromBranch(String branch) {
        String commitID = this.apiClient.getBranchHEADCommit(this.repoName, branch);
        this.exportAllFromCommit(commitID);
    }

    public void exportAllFromCommit(String commitID) {
        String ns = this.apiClient.getStorageNamespace(this.repoName, StorageClientType$.MODULE$.HadoopFS());
        Dataset<Row> df = LakeFSContext$.MODULE$.newDF(this.spark, LakeFSJobParams$.MODULE$.forCommit(this.repoName, commitID, "exporter"));
        String tableName = new StringBuilder(7).append(this.randPrefix()).append("_commit").toString();
        df.createOrReplaceTempView(tableName);
        String dst = this.dstRoot;
        Dataset actionsDF = this.spark.sql(new StringBuilder(32).append("SELECT 'copy' as action, * FROM ").append(tableName).toString());
        this.actOnActions(ns, dst, commitID, (Dataset<Row>)actionsDF);
    }

    private final int maxLoggedErrors() {
        return 10000;
    }

    private boolean export(int round, String ns, String rel, String commitID, Dataset<Row> actionsDF) {
        public final class Io_treeverse_clients_Exporter$$typecreator4$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("io.treeverse.clients.ExportStatus").asType().toTypeConstructor();
            }

            public Io_treeverse_clients_Exporter$$typecreator4$1(Exporter $outer) {
            }
        }
        Configuration hadoopConf = this.spark.sparkContext().hadoopConfiguration();
        SerializableWritable serializedConf = new SerializableWritable((Writable)hadoopConf);
        KeyFilter f = this.filter;
        int par = this.parallelism;
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(Exporter.class.getClassLoader());
        Dataset errs = actionsDF.mapPartitions((Function1 & Serializable & scala.Serializable)part -> {
            ExecutorService pool = Executors.newFixedThreadPool(par);
            Iterator res = ((IterableLike)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(pool.invokeAll((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)part.map((Function1 & Serializable & scala.Serializable)row -> new Handler(f, round, ns, rel, (SerializableWritable<Configuration>)serializedConf, (Row)row)).toList()).asJava())).asScala()).map((Function1 & Serializable & scala.Serializable)fut -> (ExportStatus)fut.get(), Buffer$.MODULE$.canBuildFrom())).iterator();
            pool.shutdown();
            return res;
        }, this.spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Io_treeverse_clients_Exporter$$typecreator4$1(null)))).filter((Function1 & Serializable & scala.Serializable)status -> BoxesRunTime.boxToBoolean((boolean)Exporter.$anonfun$export$4(status)));
        if (errs.isEmpty()) {
            return true;
        }
        long count = errs.count();
        this.writeSummaryFile(false, commitID, (String)errs.sample(scala.math.package$.MODULE$.min(1.0, (double)10000 / (double)count)).map((Function1 & Serializable & scala.Serializable)s -> s.msg(), this.spark.implicits().newStringEncoder()).reduce((Function2 & Serializable & scala.Serializable)(x$1, x$2) -> new StringBuilder(1).append((String)x$1).append("\n").append((String)x$2).toString()));
        return false;
    }

    public void exportFrom(String branch, String prevCommitID) {
        String commitID = this.apiClient.getBranchHEADCommit(this.repoName, branch);
        String ns = this.apiClient.getStorageNamespace(this.repoName, StorageClientType$.MODULE$.HadoopFS());
        Dataset<Row> newDF = LakeFSContext$.MODULE$.newDF(this.spark, LakeFSJobParams$.MODULE$.forCommit(this.repoName, commitID, "exporter"));
        Dataset<Row> prevDF = LakeFSContext$.MODULE$.newDF(this.spark, LakeFSJobParams$.MODULE$.forCommit(this.repoName, prevCommitID, "exporter"));
        String newTableName = new StringBuilder(11).append(this.randPrefix()).append("_new_commit").toString();
        String prevTableName = new StringBuilder(12).append(this.randPrefix()).append("_prev_commit").toString();
        newDF.createOrReplaceTempView(newTableName);
        prevDF.createOrReplaceTempView(prevTableName);
        String dst = this.dstRoot;
        Dataset actionsDF = this.spark.sql(new StringBuilder(552).append("\n    SELECT\n      CASE WHEN nkey is null THEN 'delete' ELSE 'copy' END as action,\n      CASE WHEN nkey is null THEN pkey ELSE nkey END as key,\n      CASE WHEN naddress is null THEN paddress ELSE naddress END as address,\n      CASE WHEN netag is null THEN petag ELSE netag END as etag\n    FROM\n    (SELECT n.key as nkey, n.address as naddress, n.etag as netag,\n      p.key as pkey, p.address as paddress, p.etag as petag\n      FROM ").append(newTableName).append(" n\n      FULL OUTER JOIN ").append(prevTableName).append(" p\n      ON n.key = p.key\n      WHERE n.etag <> p.etag OR n.etag is null or p.etag is null)\n    ").toString());
        this.actOnActions(ns, dst, commitID, (Dataset<Row>)actionsDF);
    }

    private final String successMsg() {
        return "Export completed successfully!";
    }

    private void actOnActions(String ns, String dst, String commitID, Dataset<Row> actionsDF) {
        Object object = new Object();
        try {
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.filter.rounds()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                if (!this.export(i, ns, dst, commitID, actionsDF)) {
                    throw new NonLocalReturnControl.mcV.sp(object, BoxedUnit.UNIT);
                }
            });
            this.writeSummaryFile(true, commitID, "Export completed successfully!");
        }
        catch (NonLocalReturnControl ex) {
            if (ex.key() == object) {
                ex.value$mcV$sp();
            }
            throw ex;
        }
    }

    private void writeSummaryFile(boolean success, String commitID, String content) {
        String suffix = success ? "SUCCESS" : "FAILURE";
        String time = DateTimeFormatter.ISO_INSTANT.format(Clock.systemUTC().instant());
        Path dstPath = URLResolver$.MODULE$.resolveURL(new URI(this.dstRoot), new StringBuilder(9).append("EXPORT_").append(commitID).append("_").append(time).append("_").append(suffix).toString());
        FileSystem dstFS = dstPath.getFileSystem(this.spark.sparkContext().hadoopConfiguration());
        FSDataOutputStream stream = dstFS.create(dstPath);
        stream.writeChars(content);
        stream.close();
    }

    private final int prefixLen() {
        return 8;
    }

    private String randPrefix() {
        Stream gen = Random$.MODULE$.alphanumeric().dropWhile((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)RichChar$.MODULE$.isDigit$extension(Predef$.MODULE$.charWrapper(BoxesRunTime.unboxToChar((Object)x$3)))));
        return gen.take(8).mkString("");
    }

    public static final /* synthetic */ boolean $anonfun$export$4(ExportStatus status) {
        return !status.success();
    }

    public Exporter(SparkSession spark, ApiClient apiClient, KeyFilter filter, String repoName, String dstRoot, int parallelism) {
        this.spark = spark;
        this.apiClient = apiClient;
        this.filter = filter;
        this.repoName = repoName;
        this.dstRoot = dstRoot;
        this.parallelism = parallelism;
    }

    public Exporter(SparkSession spark, ApiClient apiClient, String repoName, String dstRoot) {
        this(spark, apiClient, new SparkFilter(), repoName, dstRoot, 10);
    }
}

