/*
 * Decompiled with CFR 0.152.
 */
package org.openstreetmap.atlas.checks.distributed;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.openstreetmap.atlas.checks.base.BaseCheck;
import org.openstreetmap.atlas.checks.base.Check;
import org.openstreetmap.atlas.checks.base.CheckResourceLoader;
import org.openstreetmap.atlas.checks.configuration.ConfigurationResolver;
import org.openstreetmap.atlas.checks.distributed.AtlasDataSource;
import org.openstreetmap.atlas.checks.distributed.RunnableCheck;
import org.openstreetmap.atlas.checks.event.CheckFlagFileProcessor;
import org.openstreetmap.atlas.checks.event.CheckFlagGeoJsonProcessor;
import org.openstreetmap.atlas.checks.event.CheckFlagTippecanoeProcessor;
import org.openstreetmap.atlas.checks.event.EventService;
import org.openstreetmap.atlas.checks.event.MetricFileGenerator;
import org.openstreetmap.atlas.checks.maproulette.MapRouletteClient;
import org.openstreetmap.atlas.checks.maproulette.MapRouletteConfiguration;
import org.openstreetmap.atlas.exception.CoreException;
import org.openstreetmap.atlas.generator.tools.spark.SparkJob;
import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper;
import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileOutput;
import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFilePath;
import org.openstreetmap.atlas.geography.Rectangle;
import org.openstreetmap.atlas.geography.atlas.Atlas;
import org.openstreetmap.atlas.geography.atlas.items.AtlasObject;
import org.openstreetmap.atlas.geography.atlas.items.complex.ComplexEntity;
import org.openstreetmap.atlas.streaming.resource.FileSuffix;
import org.openstreetmap.atlas.utilities.collections.Iterables;
import org.openstreetmap.atlas.utilities.collections.MultiIterable;
import org.openstreetmap.atlas.utilities.collections.StringList;
import org.openstreetmap.atlas.utilities.configuration.Configuration;
import org.openstreetmap.atlas.utilities.configuration.MergedConfiguration;
import org.openstreetmap.atlas.utilities.configuration.StandardConfiguration;
import org.openstreetmap.atlas.utilities.conversion.StringConverter;
import org.openstreetmap.atlas.utilities.runtime.Command;
import org.openstreetmap.atlas.utilities.runtime.CommandMap;
import org.openstreetmap.atlas.utilities.scalars.Duration;
import org.openstreetmap.atlas.utilities.threads.Pool;
import org.openstreetmap.atlas.utilities.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class IntegrityCheckSparkJob
extends SparkJob {
    @Deprecated
    protected static final Command.Switch<String> ATLAS_FOLDER = new Command.Switch<String>("inputFolder", "Path of folder which contains Atlas file(s)", StringConverter.IDENTITY, Command.Optionality.OPTIONAL);
    private static final Command.Switch<StringList> CONFIGURATION_FILES = new Command.Switch("configFiles", "Comma-separated list of configuration datasources.", value -> StringList.split(value, ","), Command.Optionality.OPTIONAL);
    private static final Command.Switch<String> CONFIGURATION_JSON = new Command.Switch<String>("configJson", "Json formatted configuration.", StringConverter.IDENTITY, Command.Optionality.OPTIONAL);
    private static final Command.Switch<String> COUNTRIES = new Command.Switch<String>("countries", "Comma-separated list of country ISO3 codes to be processed", StringConverter.IDENTITY, Command.Optionality.REQUIRED);
    private static final Command.Switch<MapRouletteConfiguration> MAP_ROULETTE = new Command.Switch("maproulette", "Map roulette server information, format <Host>:<Port>:<ProjectName>:<ApiKey>, projectName is optional.", MapRouletteConfiguration::parse, Command.Optionality.OPTIONAL);
    private static final Command.Switch<Rectangle> PBF_BOUNDING_BOX = new Command.Switch("pbfBoundingBox", "OSM protobuf data will be loaded only in this bounding box", Rectangle::forString, Command.Optionality.OPTIONAL);
    private static final Command.Switch<Boolean> PBF_SAVE_INTERMEDIATE_ATLAS = new Command.Switch("savePbfAtlas", "Saves intermediate atlas files created when processing OSM protobuf data.", Boolean::valueOf, Command.Optionality.OPTIONAL, "false");
    private static final Command.Switch<Set<OutputFormats>> OUTPUT_FORMATS = new Command.Switch("outputFormats", String.format("Comma-separated list of output formats (flags, metrics, geojson, tippecanoe).", new Object[0]), csv_formats -> Stream.of(csv_formats.split(",")).map(format -> Enum.valueOf(OutputFormats.class, format.toUpperCase())).collect(Collectors.toSet()), Command.Optionality.OPTIONAL, "flags,metrics");
    private static final Command.Switch<List<String>> CHECK_FILTER = new Command.Switch("checkFilter", "Comma-separated list of checks to run", checks -> Arrays.asList(checks.split(",")), Command.Optionality.OPTIONAL);
    private static final String IGNORED_KEY = "Ignored";
    private static final String OUTPUT_FLAG_FOLDER = "flag";
    private static final String OUTPUT_GEOJSON_FOLDER = "geojson";
    private static final String OUTPUT_TIPPECANOE_FOLDER = "tippecanoe";
    private static final String OUTPUT_ATLAS_FOLDER = "atlas";
    private static final String INTERMEDIATE_ATLAS_EXTENSION = FileSuffix.ATLAS.toString() + FileSuffix.GZIP.toString();
    private static final String OUTPUT_METRIC_FOLDER = "metric";
    private static final String METRICS_FILENAME = "check-run-time.csv";
    private static final Logger logger = LoggerFactory.getLogger(IntegrityCheckSparkJob.class);
    private static final long serialVersionUID = 2990087219645942330L;
    private static final Duration POOL_DURATION_BEFORE_KILL = Duration.minutes(300.0);

    public static void main(String[] args) {
        new IntegrityCheckSparkJob().run(args);
    }

    private static void executeChecks(String country, Atlas atlas, Set<BaseCheck> checksToRun, MapRouletteConfiguration configuration) {
        Pool checkExecutionPool = new Pool(checksToRun.size(), "Check execution pool", POOL_DURATION_BEFORE_KILL);
        checksToRun.stream().filter(check -> check.validCheckForCountry(country)).forEach(check -> checkExecutionPool.queue(new RunnableCheck(country, (Check)check, (Iterable<AtlasObject>)new MultiIterable<AtlasObject>(atlas.items(), atlas.relations(), IntegrityCheckSparkJob.findComplexEntities(check, atlas)), MapRouletteClient.instance(configuration))));
        checkExecutionPool.close();
    }

    private static Iterable<ComplexEntity> findComplexEntities(BaseCheck check, Atlas atlas) {
        if (check.finder().isPresent()) {
            return Iterables.stream(check.finder().get().find(atlas));
        }
        return Collections.emptyList();
    }

    private static SparkFilePath initializeOutput(String output, TaskContext context, String country, String temporaryOutputFolder, String targetOutputFolder) {
        String workerOutputFolder = SparkFileHelper.combine(temporaryOutputFolder, String.format("p%s_a%s", context.partitionId(), context.taskAttemptId()));
        String temporaryFilePath = SparkFileHelper.combine(workerOutputFolder, output, country);
        String targetFilePath = SparkFileHelper.combine(targetOutputFolder, output, country);
        return new SparkFilePath(temporaryFilePath, targetFilePath);
    }

    private static void writeAtlas(Atlas atlas, String country, SparkFilePath output, SparkFileHelper fileHelper) {
        String fileName = String.format("%s_%s", country, atlas.getName());
        SparkFileOutput file = SparkFileOutput.from(atlas::save, output, fileName, INTERMEDIATE_ATLAS_EXTENSION, "Intermediate Atlas");
        fileHelper.save(file);
    }

    @Override
    public String getName() {
        return "Integrity Check Spark Job";
    }

    @Override
    public void start(CommandMap commandMap) {
        String atlasDirectory = (String)commandMap.get(ATLAS_FOLDER);
        String input = Optional.ofNullable(this.input(commandMap)).orElse(atlasDirectory);
        String output = this.output(commandMap);
        Set outputFormats = (Set)commandMap.get(OUTPUT_FORMATS);
        StringList countries = StringList.split((String)commandMap.get(COUNTRIES), ",");
        MapRouletteConfiguration mapRouletteConfiguration = (MapRouletteConfiguration)commandMap.get(MAP_ROULETTE);
        Optional<?> checkFilter = commandMap.getOption(CHECK_FILTER);
        MergedConfiguration checksConfiguration = new MergedConfiguration(Stream.concat(Stream.of(ConfigurationResolver.loadConfiguration(commandMap, CONFIGURATION_FILES, CONFIGURATION_JSON)), Stream.of(checkFilter.map(whitelist -> new StandardConfiguration("WhiteListConfiguration", Collections.singletonMap("CheckResourceLoader.checks.whitelist", whitelist))).orElse(ConfigurationResolver.emptyConfiguration()))).collect(Collectors.toList()));
        boolean saveIntermediateAtlas = (Boolean)commandMap.get(PBF_SAVE_INTERMEDIATE_ATLAS);
        Rectangle pbfBoundary = commandMap.getOption(PBF_BOUNDING_BOX).orElse(Rectangle.MAXIMUM);
        boolean compressOutput = Boolean.valueOf((String)commandMap.get(SparkJob.COMPRESS_OUTPUT));
        Map<String, String> sparkContext = this.configurationMap();
        CheckResourceLoader checkLoader = new CheckResourceLoader(checksConfiguration);
        Set<BaseCheck> preOverriddenChecks = checkLoader.loadChecks();
        if (!this.isValidInput(countries, preOverriddenChecks)) {
            logger.error("No countries supplied or checks enabled, exiting!");
            return;
        }
        List priorityCountries = (List)checksConfiguration.get("priority.countries", Collections.EMPTY_LIST).value();
        ArrayList countryCheckTuples = new ArrayList();
        countries.stream().filter(priorityCountries::contains).forEach(country -> countryCheckTuples.add(new Tuple2(country, checkLoader.loadChecksForCountry((String)country))));
        countries.stream().filter(country -> !priorityCountries.contains(country)).forEach(country -> countryCheckTuples.add(new Tuple2(country, checkLoader.loadChecksForCountry((String)country))));
        logger.info("Initialized countries: {}", (Object)countryCheckTuples.stream().map(tuple -> (String)tuple._1).collect(Collectors.joining(",")));
        logger.info("Initialized checks: {}", (Object)preOverriddenChecks.stream().map(BaseCheck::getCheckName).collect(Collectors.joining(",")));
        JavaPairRDD countryCheckRDD = this.getContext().parallelizePairs(countryCheckTuples, countryCheckTuples.size());
        String targetOutputFolder = SparkFileHelper.parentPath(output);
        String temporaryOutputFolder = SparkFileHelper.combine(targetOutputFolder, "_temp");
        SparkFileHelper fileHelper = new SparkFileHelper(sparkContext);
        AtlasDataSource atlasLoader = this.getAtlasDataSource(sparkContext, checksConfiguration, pbfBoundary);
        fileHelper.mkdir(SparkFileHelper.combine(targetOutputFolder, OUTPUT_FLAG_FOLDER));
        fileHelper.mkdir(SparkFileHelper.combine(targetOutputFolder, OUTPUT_GEOJSON_FOLDER));
        fileHelper.mkdir(SparkFileHelper.combine(targetOutputFolder, OUTPUT_METRIC_FOLDER));
        JavaPairRDD resultRDD = countryCheckRDD.mapToPair((PairFunction & Serializable)tuple -> {
            Consumer<Atlas> intermediateAtlasHandler;
            SparkFilePath tippecanoeOutput;
            SparkFilePath metricOutput;
            SparkFilePath geoJsonOutput;
            SparkFilePath flagOutput;
            Time timer = Time.now();
            String country = (String)tuple._1();
            Set checks = (Set)tuple._2();
            logger.info("Initialized checks for {}: {}", (Object)country, (Object)checks.stream().map(BaseCheck::getCheckName).collect(Collectors.joining(",")));
            HashSet resultingFiles = new HashSet();
            if (outputFormats.contains((Object)OutputFormats.FLAGS)) {
                flagOutput = IntegrityCheckSparkJob.initializeOutput(OUTPUT_FLAG_FOLDER, TaskContext.get(), country, temporaryOutputFolder, targetOutputFolder);
                EventService.get(country).register(new CheckFlagFileProcessor(fileHelper, flagOutput.getTemporaryPath()).withCompression(compressOutput));
            } else {
                flagOutput = null;
            }
            if (outputFormats.contains((Object)OutputFormats.GEOJSON)) {
                geoJsonOutput = IntegrityCheckSparkJob.initializeOutput(OUTPUT_GEOJSON_FOLDER, TaskContext.get(), country, temporaryOutputFolder, targetOutputFolder);
                EventService.get(country).register(new CheckFlagGeoJsonProcessor(fileHelper, geoJsonOutput.getTemporaryPath()).withCompression(compressOutput));
            } else {
                geoJsonOutput = null;
            }
            if (outputFormats.contains((Object)OutputFormats.METRICS)) {
                metricOutput = IntegrityCheckSparkJob.initializeOutput(OUTPUT_METRIC_FOLDER, TaskContext.get(), country, temporaryOutputFolder, targetOutputFolder);
                EventService.get(country).register(new MetricFileGenerator(METRICS_FILENAME, fileHelper, metricOutput.getTemporaryPath()));
            } else {
                metricOutput = null;
            }
            if (outputFormats.contains((Object)OutputFormats.TIPPECANOE)) {
                tippecanoeOutput = IntegrityCheckSparkJob.initializeOutput(OUTPUT_TIPPECANOE_FOLDER, TaskContext.get(), country, temporaryOutputFolder, targetOutputFolder);
                EventService.get(country).register(new CheckFlagTippecanoeProcessor(fileHelper, tippecanoeOutput.getTemporaryPath()).withCompression(compressOutput));
            } else {
                tippecanoeOutput = null;
            }
            if (saveIntermediateAtlas) {
                SparkFilePath atlasOutput = IntegrityCheckSparkJob.initializeOutput(OUTPUT_ATLAS_FOLDER, TaskContext.get(), country, temporaryOutputFolder, targetOutputFolder);
                intermediateAtlasHandler = atlas -> {
                    IntegrityCheckSparkJob.writeAtlas(atlas, country, atlasOutput, fileHelper);
                    resultingFiles.add(atlasOutput);
                };
            } else {
                intermediateAtlasHandler = atlas -> {};
            }
            try {
                Atlas atlas2 = atlasLoader.load(input, country, intermediateAtlasHandler);
                if (atlas2 == null) {
                    logger.error("Could not find {} Atlas files. Skipping country!", (Object)country);
                } else {
                    IntegrityCheckSparkJob.executeChecks(country, atlas2, checks, mapRouletteConfiguration);
                    Stream.of(flagOutput, metricOutput, geoJsonOutput, tippecanoeOutput).filter(Objects::nonNull).forEach(resultingFiles::add);
                }
                EventService.get(country).complete();
                Tuple2 tuple2 = new Tuple2((Object)country, resultingFiles);
                return tuple2;
            }
            catch (CoreException e) {
                logger.error("Exception running integrity checks on {}", (Object)country, (Object)e);
            }
            finally {
                logger.info("Integrity checks finished in {} to execute for {}.", (Object)timer.elapsedSince(), (Object)country);
            }
            return new Tuple2((Object)IGNORED_KEY, null);
        }).filter((Function & Serializable)tuple -> !((String)tuple._1()).equals(IGNORED_KEY));
        resultRDD.foreach((VoidFunction & Serializable)countryPathPair -> {
            String country = (String)countryPathPair._1();
            Set paths = (Set)countryPathPair._2();
            logger.info("[{}] Committing outputs: {}", (Object)country, (Object)paths);
            paths.forEach(fileHelper::commitByCopy);
        });
        try {
            logger.info("Deleting {}.", (Object)temporaryOutputFolder);
            fileHelper.deleteDirectory(temporaryOutputFolder);
            atlasLoader.close();
        }
        catch (Exception e) {
            logger.warn("Clean up failed!", (Throwable)e);
        }
    }

    protected AtlasDataSource getAtlasDataSource(Map<String, String> sparkContext, Configuration checksConfiguration, Rectangle pbfBoundary) {
        return new AtlasDataSource(sparkContext, checksConfiguration, pbfBoundary);
    }

    @Override
    protected List<String> outputToClean(CommandMap command) {
        String output = this.output(command);
        List<String> staticPaths = super.outputToClean(command);
        staticPaths.add(this.getAlternateSubFolderOutput(output, OUTPUT_FLAG_FOLDER));
        staticPaths.add(this.getAlternateSubFolderOutput(output, OUTPUT_GEOJSON_FOLDER));
        staticPaths.add(this.getAlternateSubFolderOutput(output, OUTPUT_ATLAS_FOLDER));
        return staticPaths;
    }

    @Override
    protected Command.SwitchList switches() {
        return super.switches().with(ATLAS_FOLDER, MAP_ROULETTE, COUNTRIES, CONFIGURATION_FILES, CONFIGURATION_JSON, PBF_BOUNDING_BOX, PBF_SAVE_INTERMEDIATE_ATLAS, OUTPUT_FORMATS, CHECK_FILTER);
    }

    private boolean isValidInput(StringList countries, Set<BaseCheck> checksToExecute) {
        return countries.size() != 0 && checksToExecute.size() != 0;
    }

    private static enum OutputFormats {
        FLAGS,
        GEOJSON,
        METRICS,
        TIPPECANOE;

    }
}

