package org.gorpipe.spark.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import gorsat.Commands.CommandParseUtilities;
import gorsat.process.FreemarkerReportBuilder;
import gorsat.process.GenericRunnerFactory;
import gorsat.process.GorSessionCacheManager;
import gorsat.process.PipeOptions;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.gorpipe.gor.clients.LocalFileCacheClient;
import org.gorpipe.gor.model.DriverBackedFileReader;
import org.gorpipe.gor.session.GorContext;
import org.gorpipe.gor.session.ProjectContext;
import org.gorpipe.gor.session.SystemContext;
import org.gorpipe.spark.GeneralSparkQueryHandler;
import org.gorpipe.spark.GorQueryRDD;
import org.gorpipe.spark.GorSparkSession;
import org.gorpipe.spark.SparkOperatorRunner;
import org.gorpipe.spark.platform.GorQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.Base64;

/* loaded from: input_file:org/gorpipe/spark/redis/RedisBatchConsumer.class */
public class RedisBatchConsumer implements VoidFunction2<Dataset<Row>, Long>, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RedisBatchConsumer.class);
    private static final String DEFAULT_CACHE_DIR = "result_cache";
    GorSparkSession gss;
    SystemContext sysctx;
    MonitorThread mont;
    ExecutorService es;

    public RedisBatchConsumer(SparkSession sparkSession, String str) {
        log.info("Starting RedisBatchConsumer on redisUri " + str);
        this.gss = new GorSparkSession("");
        this.gss.setSparkSession(sparkSession);
        this.gss.redisUri_$eq(str);
        this.sysctx = new SystemContext.Builder().setReportBuilder(new FreemarkerReportBuilder(this.gss)).setRunnerFactory(new GenericRunnerFactory()).setServer(true).setStartTime(System.currentTimeMillis()).build();
        this.es = Executors.newWorkStealingPool(4);
        log.info("Starting monitorthread");
        this.mont = new MonitorThread(str);
        this.es.submit(this.mont);
        log.info("Monitorthread submitted");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        log.info("Closing RedisBatchConsumer");
        this.mont.stopRunning();
        this.es.shutdown();
    }

    public Future<List<String>> runGorJobs(String str, Set<Integer> set, String[] strArr, String[] strArr2, String[] strArr3, String[] strArr4) {
        String[] strArr5 = new String[set.size()];
        String[] strArr6 = new String[set.size()];
        String[] strArr7 = new String[set.size()];
        String[] strArr8 = new String[set.size()];
        int i = 0;
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            strArr5[i] = strArr[intValue];
            strArr6[i] = strArr2[intValue];
            strArr8[i] = strArr3[intValue].substring(strArr3[intValue].lastIndexOf(58) + 1);
            strArr7[i] = strArr4[intValue];
            i++;
        }
        return new GorQueryRDD(this.gss.sparkSession(), strArr5, strArr6, strArr7, str, DEFAULT_CACHE_DIR, this.gss.getProjectContext() != null ? this.gss.getProjectContext().getGorConfigFile() : null, this.gss.getProjectContext() != null ? this.gss.getProjectContext().getGorAliasFile() : null, strArr8, this.gss.redisUri()).toJavaRDD().collectAsync();
    }

    public Future<List<String>> runSparkJob(String str, String[] strArr, String str2, String str3, String str4) {
        log.info("Running spark job " + str3 + ": " + str2);
        String substring = str3.substring(str3.lastIndexOf(58) + 1);
        String property = System.getProperty("gor.project.config.path", "config/gor_config.txt");
        String property2 = System.getProperty("gor.project.alias.path", "config/gor_standard_aliases.txt");
        Path path = Paths.get(str, new String[0]);
        this.gss.init(new ProjectContext.Builder().setRoot(str).setCacheDir(DEFAULT_CACHE_DIR).setFileReader(new DriverBackedFileReader("", str, (Object[]) null)).setConfigFile(path.resolve(property).toAbsolutePath().normalize().toString()).setAliasFile(path.resolve(property2).toAbsolutePath().normalize().toString()).setQueryHandler(new GeneralSparkQueryHandler(this.gss, this.gss.redisUri())).setFileCache(new LocalFileCacheClient(path.resolve(DEFAULT_CACHE_DIR))).build(), this.sysctx, GorSessionCacheManager.getCache(this.gss.getRequestId()));
        GorContext gorContext = new GorContext(this.gss);
        int indexOf = str2.indexOf(32);
        String str5 = str2.substring(0, indexOf + 1) + "-j " + substring + str2.substring(indexOf);
        String replaceAliases = this.gss.replaceAliases(strArr.length > 0 ? String.join(";", strArr) + ";" + str5 : str5);
        new PipeOptions().parseOptions(new String[]{replaceAliases, "-queryhandler", "spark"});
        return this.es.submit(new SparkGorQuery(gorContext, replaceAliases, str4));
    }

    public Map<String, Future<List<String>>> runJobBatch(List<String[]> list) {
        Optional findFirst = list.stream().map(strArr -> {
            return strArr[2];
        }).findFirst();
        HashMap hashMap = new HashMap();
        if (findFirst.isPresent()) {
            String str = (String) findFirst.get();
            String[] strArr2 = (String[]) list.stream().map(strArr3 -> {
                return strArr3[0];
            }).toArray(i -> {
                return new String[i];
            });
            String[] strArr4 = (String[]) list.stream().map(strArr5 -> {
                return strArr5[1];
            }).toArray(i2 -> {
                return new String[i2];
            });
            String[] strArr6 = (String[]) list.stream().map(strArr7 -> {
                return strArr7[5];
            }).toArray(i3 -> {
                return new String[i3];
            });
            String[] strArr8 = (String[]) list.stream().map(strArr9 -> {
                return strArr9[4];
            }).toArray(i4 -> {
                return new String[i4];
            });
            this.mont.setValue(strArr8, "status", SparkOperatorRunner.SPARKAPPLICATION_RUNNING_STATE);
            TreeSet treeSet = new TreeSet();
            for (int i5 = 0; i5 < strArr2.length; i5++) {
                String str2 = strArr2[i5];
                String str3 = strArr8[i5];
                String[] quoteSafeSplit = CommandParseUtilities.quoteSafeSplit(str2, ';');
                String trim = quoteSafeSplit[quoteSafeSplit.length - 1].trim();
                String upperCase = trim.toUpperCase();
                if (upperCase.startsWith("SELECT ") || upperCase.startsWith("SPARK ") || upperCase.startsWith("GORSPARK ") || upperCase.startsWith("NORSPARK ")) {
                    hashMap.put(str3, runSparkJob(str, (String[]) Arrays.copyOfRange(quoteSafeSplit, 0, quoteSafeSplit.length - 1), trim, str3, strArr6[i5]));
                } else {
                    treeSet.add(Integer.valueOf(i5));
                }
            }
            if (treeSet.size() > 0) {
                hashMap.put((String) treeSet.stream().map(num -> {
                    return strArr8[num.intValue()];
                }).collect(Collectors.joining(",")), runGorJobs(str, treeSet, strArr2, strArr4, strArr8, strArr6));
            }
        }
        return hashMap;
    }

    public void call(Dataset<Row> dataset, Long l) {
        List collectAsList = dataset.collectAsList();
        log.info("Received batch of " + collectAsList.size());
        runJobBatch((List) collectAsList.stream().filter(row -> {
            return row.getString(2).equals("payload");
        }).map(row2 -> {
            String str;
            String string = row2.getString(1);
            String string2 = row2.getString(3);
            try {
                Map map = (Map) new ObjectMapper().readValue(string2.substring(1, string2.length() - 1), Map.class);
                String str2 = (String) map.get("query");
                String str3 = (String) map.get("fingerprint");
                String str4 = (String) map.get("projectRoot");
                String str5 = (String) map.get(GorQuery.REQUEST_ID_FIELD);
                String str6 = new String(Base64.decode(str2));
                String str7 = "result_cache/" + str3 + CommandParseUtilities.getExtensionForQuery(str6, false);
                if (map.containsKey("cachefile") && (str = (String) map.get("cachefile")) != null) {
                    str7 = str;
                }
                return new String[]{str6, str3, str4, str5, string, str7};
            } catch (IOException e) {
                log.error("Error when parsing redis json", e);
                return new String[0];
            }
        }).collect(Collectors.toList())).forEach((str, future) -> {
            this.mont.addJob(str, future);
        });
    }

    public static void main(String[] strArr) {
        String str = strArr[0];
        String str2 = strArr[1];
        String str3 = strArr[2];
        String str4 = strArr[3];
        String str5 = strArr[4];
        String str6 = strArr[5];
        String str7 = strArr[6];
        try {
            SparkSession orCreate = new SparkSession.Builder().master("local[*]").getOrCreate();
            try {
                RedisBatchConsumer redisBatchConsumer = new RedisBatchConsumer(orCreate, str);
                try {
                    String[] split = str4.split(";;");
                    String[] split2 = str5.split(";");
                    String[] split3 = str6.split(";");
                    String[] split4 = str7.split(";");
                    Map<String, Future<List<String>>> runJobBatch = redisBatchConsumer.runJobBatch((List) IntStream.range(0, split2.length).mapToObj(i -> {
                        return new String[]{split[i], split2[i], str3, str2, split4[i], split3[i]};
                    }).collect(Collectors.toList()));
                    log.info("Number of batches " + runJobBatch.size());
                    Iterator<Future<List<String>>> it = runJobBatch.values().iterator();
                    while (it.hasNext()) {
                        it.next().get();
                    }
                    log.info("Finised running all batches");
                    redisBatchConsumer.close();
                    if (orCreate != null) {
                        orCreate.close();
                    }
                } catch (Throwable th) {
                    try {
                        redisBatchConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
