/*
 * Decompiled with CFR 0.152.
 */
package org.openstreetmap.atlas.generator.tools.spark;

import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.serializer.JavaSerializer;
import org.openstreetmap.atlas.exception.CoreException;
import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemCreator;
import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemPerformanceHelper;
import org.openstreetmap.atlas.generator.tools.spark.context.DefaultSparkContextProvider;
import org.openstreetmap.atlas.generator.tools.spark.context.SparkContextProvider;
import org.openstreetmap.atlas.generator.tools.spark.context.SparkContextProviderFinder;
import org.openstreetmap.atlas.generator.tools.spark.converters.SparkOptionsStringConverter;
import org.openstreetmap.atlas.streaming.Streams;
import org.openstreetmap.atlas.streaming.compression.Decompressor;
import org.openstreetmap.atlas.streaming.resource.FileSuffix;
import org.openstreetmap.atlas.streaming.resource.InputStreamResource;
import org.openstreetmap.atlas.streaming.resource.Resource;
import org.openstreetmap.atlas.utilities.conversion.StringConverter;
import org.openstreetmap.atlas.utilities.runtime.Command;
import org.openstreetmap.atlas.utilities.runtime.CommandMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public abstract class SparkJob
extends Command
implements Serializable {
    private static final long serialVersionUID = -3267868312907886517L;
    private static final Logger logger = LoggerFactory.getLogger(SparkJob.class);
    public static final Command.Switch<String> INPUT = new Command.Switch<String>("input", "Input path of the Spark Job", StringConverter.IDENTITY, Command.Optionality.OPTIONAL);
    public static final Command.Switch<String> OUTPUT = new Command.Switch<String>("output", "Output path of the Spark Job", StringConverter.IDENTITY, Command.Optionality.REQUIRED);
    public static final Command.Switch<String> STARTED_FOLDER = new Command.Switch<String>("startedFolder", "Folder where the spark job will write the \"_STARTED\" file.", StringConverter.IDENTITY, Command.Optionality.REQUIRED);
    public static final Command.Switch<String> MASTER = new Command.Switch<String>("master", "The spark master URL", StringConverter.IDENTITY, Command.Optionality.OPTIONAL);
    public static final Command.Switch<Map<String, String>> SPARK_OPTIONS = new Command.Switch<Map<String, String>>("sparkOptions", "Comma separated list of Spark options, i.e. key1->value1,key2->value2", new SparkOptionsStringConverter(), Command.Optionality.OPTIONAL, "");
    public static final Command.Switch<Map<String, String>> ADDITIONAL_SPARK_OPTIONS = new Command.Switch<Map<String, String>>("additionalSparkOptions", "Comma separated list of additional Spark options, i.e. key1->value1,key2->value2", new SparkOptionsStringConverter(), Command.Optionality.OPTIONAL, "");
    public static final Command.Switch<String> COMPRESS_OUTPUT = new Command.Switch<String>("compressOutput", "Whether or not compress the output of spark job", StringConverter.IDENTITY, Command.Optionality.OPTIONAL, "true");
    public static final Command.Switch<SparkContextProvider> SPARK_CONTEXT_PROVIDER = new Command.Switch<SparkContextProvider>("sparkContextProvider", "The class name of the Spark Context Provider", new SparkContextProviderFinder(), Command.Optionality.OPTIONAL, DefaultSparkContextProvider.class.getCanonicalName());
    public static final Command.Switch<Pattern> SENSITIVE_CONFIGURATION_PATTERN = new Command.Switch("sensitiveConfiguration", "Regular expression pattern of spark configuration keys to avoid logging.", Pattern::compile, Command.Optionality.OPTIONAL, ".*");
    public static final String SUCCESS_FILE = "_SUCCESS";
    public static final String STARTED_FILE = "_STARTED";
    public static final String EXITED_FILE = "_EXITED";
    public static final String FAILED_FILE = "_FAILED";
    public static final String SAVING_SEPARATOR = "-";
    private transient JavaSparkContext context;

    public static Resource resource(String path, Map<String, String> configurationMap) {
        try {
            FileSystem fileSystem = new FileSystemCreator().get(path, configurationMap);
            if (!fileSystem.exists(new Path(path))) {
                return null;
            }
            InputStreamResource resource = new InputStreamResource(() -> {
                try {
                    return fileSystem.open(new Path(path));
                }
                catch (IOException | IllegalArgumentException e) {
                    throw new CoreException("Unable to open {}", path, e);
                }
            });
            if (path.endsWith(FileSuffix.GZIP.toString())) {
                resource.setDecompressor(Decompressor.GZIP);
            }
            resource.setName(path);
            return resource;
        }
        catch (Exception e) {
            throw new CoreException("Could not open resource {}", path, e);
        }
    }

    public abstract String getName();

    @Override
    public int onRun(CommandMap command) {
        String sparkMaster = (String)command.get(MASTER);
        Map options = (Map)command.get(SPARK_OPTIONS);
        Map additionalOptions = (Map)command.get(ADDITIONAL_SPARK_OPTIONS);
        Pattern sensitiveConfiguration = (Pattern)command.get(SENSITIVE_CONFIGURATION_PATTERN);
        additionalOptions.forEach((key, value) -> options.put(key, value));
        SparkConf configuration = new SparkConf().setAppName(this.getName());
        for (String key2 : options.keySet()) {
            String value2 = (String)options.get(key2);
            if (!sensitiveConfiguration.matcher(key2).matches()) {
                logger.info("Forcing configuration from -{}: key: \"{}\", value: \"{}\"", new Object[]{SPARK_OPTIONS.getName(), key2, value2});
            } else {
                logger.info("Forcing configuration from -{}: key: \"{}\", value: \"**********\"", (Object)SPARK_OPTIONS.getName(), (Object)key2);
            }
            configuration.set(key2, value2);
        }
        if (sparkMaster != null) {
            configuration.setMaster(sparkMaster);
        }
        configuration.set("spark.serializer", JavaSerializer.class.getCanonicalName());
        configuration.set("mapreduce.output.fileoutputformat.compress", (String)command.get(COMPRESS_OUTPUT));
        logger.info("SparkConf ===============================================================");
        for (Tuple2 tuple : configuration.getAll()) {
            if (!sensitiveConfiguration.matcher((CharSequence)tuple._1()).matches()) {
                logger.info("SparkConf: key: \"{}\", value: \"{}\"", tuple._1(), tuple._2());
                continue;
            }
            logger.info("SparkConf: key: \"{}\", value: \"**********\"", tuple._1());
        }
        logger.info("SparkConf ===============================================================");
        this.context = (JavaSparkContext)((SparkContextProvider)command.get(SPARK_CONTEXT_PROVIDER)).apply(configuration);
        for (String key3 : options.keySet()) {
            String value3 = (String)options.get(key3);
            this.context.hadoopConfiguration().set(key3, value3);
        }
        String output = this.output(command);
        String started = (String)command.get(STARTED_FOLDER);
        try {
            this.checkFileDoesNotExists(started, STARTED_FILE);
            this.writeStatus(started, STARTED_FILE, "Started!");
            FileSystemPerformanceHelper.openRenamePool();
            this.start(command);
            FileSystemPerformanceHelper.waitForAndCloseRenamePool();
            this.deleteStatus(started, STARTED_FILE);
            this.writeStatus(output, SUCCESS_FILE, "Success!");
        }
        catch (Exception exception) {
            logger.error("Unable to prepare output directories.", (Throwable)exception);
            try {
                this.writeStatus(output, FAILED_FILE, "Failed!");
                this.deleteStatus(started, STARTED_FILE);
            }
            catch (Exception exception2) {
                logger.error("Unable to cleanup output directories after error.", (Throwable)exception2);
                this.writeStatus(output, FAILED_FILE, "Failed!");
            }
            throw exception;
        }
        finally {
            this.context.stop();
            this.context.close();
        }
        return 0;
    }

    public abstract void start(CommandMap var1);

    protected Configuration configuration() {
        Configuration result = new Configuration();
        for (Tuple2 key : this.getContext().getConf().getAll()) {
            result.set((String)key._1(), (String)key._2());
        }
        return result;
    }

    protected Map<String, String> configurationMap() {
        HashMap<String, String> result = new HashMap<String, String>();
        for (Tuple2 key : this.getContext().getConf().getAll()) {
            result.put((String)key._1(), (String)key._2());
        }
        return result;
    }

    protected String getAlternateParallelFolderOutput(String output, String name) {
        return output.substring(0, output.lastIndexOf("/")) + SAVING_SEPARATOR + name;
    }

    protected String getAlternateSubFolderOutput(String output, String name) {
        return output.substring(0, output.lastIndexOf("/")) + "/" + name;
    }

    protected JavaSparkContext getContext() {
        return this.context;
    }

    protected String input(CommandMap command) {
        return (String)command.get(INPUT);
    }

    protected String output(CommandMap command) {
        return (String)command.get(OUTPUT);
    }

    protected List<String> outputToClean(CommandMap command) {
        ArrayList<String> result = new ArrayList<String>();
        result.add(this.output(command));
        return result;
    }

    protected Resource resource(String path) {
        return SparkJob.resource(path, this.configurationMap());
    }

    protected <T> void splitAndSaveAsHadoopFile(JavaPairRDD<String, T> input, String path, Class<T> valueClass, Class<? extends MultipleOutputFormat<String, T>> formatterClass, java.util.function.Function<String, String> keyReducer) {
        List splitNames = input.keys().map(keyReducer::apply).distinct().collect();
        for (String splitName : splitNames) {
            input.filter((Function & Serializable)tuple -> splitName.equals(keyReducer.apply((String)tuple._1()))).saveAsHadoopFile(path + SAVING_SEPARATOR + splitName, String.class, valueClass, formatterClass);
        }
    }

    @Override
    protected Command.SwitchList switches() {
        return new Command.SwitchList().with(INPUT, OUTPUT, STARTED_FOLDER, MASTER, SPARK_OPTIONS, ADDITIONAL_SPARK_OPTIONS, COMPRESS_OUTPUT, SPARK_CONTEXT_PROVIDER, SENSITIVE_CONFIGURATION_PATTERN);
    }

    private void checkFileDoesNotExists(String path, String name) {
        boolean fileIsThere = false;
        try {
            FileSystem fileSystem = this.getFileSystem(path);
            try {
                fileSystem.getFileStatus(new Path(path + "/" + name));
                fileIsThere = true;
            }
            catch (FileNotFoundException fileNotFoundException) {}
        }
        catch (Exception e) {
            throw new CoreException("Could not check {}/{} file.", path, name, e);
        }
        if (fileIsThere) {
            throw new CoreException("File {}/{} exists, even though it should not.", path, name);
        }
    }

    private void deleteStatus(String path) {
        try {
            FileSystem fileSystem = this.getFileSystem(path);
            fileSystem.delete(new Path(path), false);
        }
        catch (Exception e) {
            throw new CoreException("Could not delete {}", path, e);
        }
    }

    private void deleteStatus(String path, String name) {
        this.deleteStatus(path + "/" + name);
    }

    private FileSystem getFileSystem(String path) throws IllegalArgumentException, IOException {
        return new Path(path).getFileSystem(this.configuration());
    }

    private void writeStatus(String path, String name, String contents) {
        try {
            FileSystem fileSystem = this.getFileSystem(path);
            BufferedWriter out = new BufferedWriter(new OutputStreamWriter((OutputStream)fileSystem.create(new Path(path + "/" + name))));
            out.write(contents);
            Streams.close(out);
        }
        catch (Exception e) {
            throw new CoreException("Could not write file {}/{}", path, name, e);
        }
    }
}

