/*
 * Decompiled with CFR 0.152.
 */
package org.openstreetmap.atlas.mutator;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import org.openstreetmap.atlas.exception.CoreException;
import org.openstreetmap.atlas.generator.persistence.MultipleAtlasOutputFormat;
import org.openstreetmap.atlas.generator.tools.caching.HadoopAtlasFileCache;
import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemCreator;
import org.openstreetmap.atlas.generator.tools.spark.SparkJob;
import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper;
import org.openstreetmap.atlas.geography.atlas.Atlas;
import org.openstreetmap.atlas.geography.atlas.change.Change;
import org.openstreetmap.atlas.geography.atlas.change.FeatureChange;
import org.openstreetmap.atlas.geography.atlas.packed.PackedAtlas;
import org.openstreetmap.atlas.geography.sharding.CountryShard;
import org.openstreetmap.atlas.geography.sharding.Sharding;
import org.openstreetmap.atlas.mutator.AtlasMutatorDriver;
import org.openstreetmap.atlas.mutator.AtlasMutatorHelper;
import org.openstreetmap.atlas.mutator.AtlasMutatorParameters;
import org.openstreetmap.atlas.mutator.configuration.AtlasMutationLevel;
import org.openstreetmap.atlas.mutator.configuration.AtlasMutatorConfiguration;
import org.openstreetmap.atlas.mutator.configuration.InputDependency;
import org.openstreetmap.atlas.mutator.persistence.FeatureChangeOutputFormat;
import org.openstreetmap.atlas.mutator.persistence.MultipleFeatureChangeOutputFormat;
import org.openstreetmap.atlas.streaming.Streams;
import org.openstreetmap.atlas.streaming.compression.Compressor;
import org.openstreetmap.atlas.streaming.resource.FileSuffix;
import org.openstreetmap.atlas.streaming.resource.OutputStreamWritableResource;
import org.openstreetmap.atlas.utilities.collections.Iterables;
import org.openstreetmap.atlas.utilities.collections.Maps;
import org.openstreetmap.atlas.utilities.collections.StringList;
import org.openstreetmap.atlas.utilities.maps.MultiMap;
import org.openstreetmap.atlas.utilities.runtime.Command;
import org.openstreetmap.atlas.utilities.runtime.CommandMap;
import org.openstreetmap.atlas.utilities.runtime.Retry;
import org.openstreetmap.atlas.utilities.scalars.Duration;
import org.openstreetmap.atlas.utilities.threads.Pool;
import org.openstreetmap.atlas.utilities.threads.Result;
import org.openstreetmap.atlas.utilities.time.Time;
import org.openstreetmap.atlas.utilities.tuples.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class AtlasMutator
extends SparkJob {
    public static final String COUNTRY_AND_LEVELS = "countryAndLevels.txt";
    public static final String POTENTIAL_LOG_FILES_PATH = "potentialLogFiles";
    public static final String POTENTIAL_LOG_FILES_NAME = FileSuffix.TEXT.toString() + FileSuffix.GZIP.toString();
    public static final String MUTATOR_META_DATA_KEY = "mutator";
    public static final String MUTATOR_META_DATA_SPLIT = ":";
    public static final String LOG_FOLDER = "logs";
    public static final String LOG_GENERATED = "generated";
    public static final String LOG_ASSIGNED = "assigned";
    public static final String LOG_APPLIED = "applied";
    protected static final String LEVEL_MESSAGE = "\n\n********** {} **********\n";
    private static final long serialVersionUID = -7622283643442835682L;
    private static final Logger logger = LoggerFactory.getLogger(AtlasMutator.class);
    private static final int SHARD_BUILDER_THREADS = 10;
    private static final int WRITE_RETRIES = 5;
    private static final StorageLevel STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK_SER();
    private transient AtlasMutatorConfiguration atlasMutatorConfiguration;
    private transient Map<String, List<AtlasMutationLevel>> countryGroupMutations;
    private transient Map<String, List<CountryShard>> countryGroupShards;
    private transient AtlasMutatorParameters.DebugFeatureChangeOutput debugFeatureChangeOutput;
    private transient int startLevel = 0;
    private transient int stopLevel = Integer.MAX_VALUE;
    private final transient Set<String> debugShards = new HashSet<String>();
    private transient Sharding sharding;
    private transient String outputFolder;
    private transient boolean appliedRDDPersistUseSer;

    public static void main(String[] args) {
        new HadoopAtlasFileCache("", Maps.hashMap((Object[])new String[0])).invalidate();
        new AtlasMutator().runWithoutQuitting(args);
    }

    @Override
    public String getName() {
        return "Atlas Mutator";
    }

    @Override
    public int onRun(CommandMap command) {
        Map<String, String> additionalMap = new HashMap<String, String>();
        if (command.containsKey((Object)SparkJob.ADDITIONAL_SPARK_OPTIONS.getName())) {
            additionalMap = (Map)command.get((Object)SparkJob.ADDITIONAL_SPARK_OPTIONS.getName());
        }
        additionalMap.put("spark.hadoop.validateOutputSpecs", "false");
        command.put((Object)SparkJob.ADDITIONAL_SPARK_OPTIONS.getName(), additionalMap);
        return super.onRun(command);
    }

    @Override
    public void start(CommandMap command) {
        this.atlasMutatorConfiguration = AtlasMutatorParameters.atlasMutatorConfiguration(command, this.configurationMap());
        this.outputFolder = AtlasMutatorParameters.output(command);
        if (this.countryGroupMutations == null) {
            this.countryGroupMutations = this.atlasMutatorConfiguration.getCountryToMutationLevels();
        }
        if (this.countryGroupMutations.isEmpty()) {
            throw new CoreException("There was nothing to run! This is usually a configuration issue.");
        }
        if (this.countryGroupShards == null) {
            this.countryGroupShards = new ConcurrentHashMap<String, List<CountryShard>>();
        }
        this.debugFeatureChangeOutput = AtlasMutatorParameters.debugFeatureChangeOutput(command);
        this.startLevel = AtlasMutatorParameters.startLevel(command);
        this.stopLevel = AtlasMutatorParameters.stopLevel(command);
        this.sharding = AtlasMutatorParameters.sharding(command, this.configurationMap());
        this.appliedRDDPersistUseSer = AtlasMutatorParameters.isAppliedRDDPersistUseSer(command);
        AtlasMutatorParameters.debugShards(command).forEach(includeListed -> {
            this.debugShards.add((String)includeListed);
            this.countryGroupMutations.values().forEach(levelList -> levelList.forEach(level -> level.addDebugWhiteListedShard((String)includeListed)));
        });
        AtlasMutatorParameters.debugMutations(command).forEach(includeListed -> this.amendLevels(level -> level.addDebugWhiteListedMutator((String)includeListed)));
        boolean addMutationTags = AtlasMutatorParameters.isAddMutationTags(command);
        this.amendLevels(level -> level.setAddMutationTags(addMutationTags));
        this.saveCountryAndLevels();
        this.createBroadcastVariables();
        this.execute(command);
        this.copyToOutput(command, AtlasMutatorParameters.input(command), AtlasMutatorParameters.output(command));
        logger.info("Done!");
    }

    @Override
    public Command.SwitchList switches() {
        Command.SwitchList result = super.switches();
        result.addAll((Collection)AtlasMutatorParameters.switches());
        return result;
    }

    private void amendLevels(Consumer<AtlasMutationLevel> levelConsumer) {
        this.countryGroupMutations.values().forEach(levelList -> levelList.forEach(levelConsumer));
    }

    private Pool buildShards() {
        Pool pool = new Pool(10, "shards-builder");
        for (Map.Entry<String, List<AtlasMutationLevel>> entry : this.countryGroupMutations.entrySet()) {
            if (this.debugShards.isEmpty()) {
                logger.info("Submitting {} to initial shards discovery step.", (Object)entry.getKey());
                pool.queue(() -> {
                    List countryShards = Iterables.asList(((AtlasMutationLevel)((List)entry.getValue()).get(0)).shards());
                    this.countryGroupShards.put((String)entry.getKey(), countryShards);
                    this.savePotentialLogFiles((List)entry.getValue(), countryShards);
                });
                continue;
            }
            List<CountryShard> countryShards = this.debugShards.stream().map(arg_0 -> ((Sharding)this.sharding).shardForName(arg_0)).flatMap(shard -> ((AtlasMutationLevel)((List)entry.getValue()).iterator().next()).getCountries().stream().map(country -> new CountryShard(country, shard))).collect(Collectors.toList());
            this.countryGroupShards.put(entry.getKey(), countryShards);
            this.savePotentialLogFiles(entry.getValue(), countryShards);
        }
        return pool;
    }

    private void createBroadcastVariables() {
        HashMap broadcastVariables = new HashMap();
        this.countryGroupMutations.values().forEach(levelList -> levelList.forEach(level -> level.getMutators().forEach(mutator -> mutator.getBroadcastVariablesNeeded().forEach((name, value) -> {
            if (!broadcastVariables.containsKey(name)) {
                broadcastVariables.put(name, this.getContext().broadcast(value.read(this.configurationMap())));
            }
        }))));
        this.amendLevels(level -> broadcastVariables.forEach(level::addBroadcastVariable));
    }

    private void execute(CommandMap command) {
        int numberCountryGroups = this.countryGroupMutations.size();
        ConcurrentHashMap countryGroupsProcessed = new ConcurrentHashMap();
        ConcurrentHashMap countryGroupsAvailable = new ConcurrentHashMap();
        int countryConcurrency = AtlasMutatorParameters.countryConcurrency(command);
        Duration submissionStaggering = AtlasMutatorParameters.submissionStaggering(command);
        try (Pool buildShards = this.buildShards();
             Pool pool = new Pool(countryConcurrency, "country-dag");
             Pool geoJsonPool = new Pool(countryConcurrency, "save-ldgeojson");
             Pool awaitResultsPool = new Pool(countryConcurrency, "await-results");){
            HashMap<String, Result> results = new HashMap<String, Result>();
            while (countryGroupsProcessed.size() < numberCountryGroups) {
                if (countryGroupsAvailable.size() > 0) {
                    countryGroupsAvailable.keySet().forEach(countryGroup -> {
                        if (submissionStaggering.isMoreThan(Duration.ZERO)) {
                            logger.info("Staggering country {} submission to the driver by {}.", countryGroup, (Object)submissionStaggering);
                            submissionStaggering.sleep();
                        }
                        results.put((String)countryGroup, pool.queue(this.mutateCountryGroup(this.countryGroupMutations.get(countryGroup), this.countryGroupShards.get(countryGroup), geoJsonPool)));
                    });
                    countryGroupsProcessed.putAll(countryGroupsAvailable);
                    countryGroupsAvailable.clear();
                }
                this.countryGroupShards.keySet().stream().filter(country -> !countryGroupsProcessed.containsKey(country)).forEach(country -> {
                    logger.info("Found new country shards: {}", country);
                    countryGroupsAvailable.put(country, true);
                });
                Duration.ONE_SECOND.sleep();
            }
            results.forEach((countryGroup, result) -> awaitResultsPool.queue(() -> {
                try {
                    result.get();
                    logger.info("Finished Mutator DAG for {}", countryGroup);
                }
                catch (Exception e) {
                    throw new CoreException("Unable to process country {}", new Object[]{countryGroup, e});
                }
            }));
        }
    }

    private String getLogFileName(AtlasMutationLevel level, String type) {
        return "-" + level.getLevelIndex() + "-" + type;
    }

    private Callable<Boolean> mutateCountryGroup(List<AtlasMutationLevel> levels, List<CountryShard> shards, Pool geoJsonPool) {
        return () -> {
            Time start = Time.now();
            logger.info("Starting Spark Job for country {}", (Object)((AtlasMutationLevel)levels.get(0)).getCountryGroup());
            if (this.startLevel > 0 && this.startLevel >= levels.size()) {
                throw new CoreException("Debug Skip Level specified to {} cannot be applied to {} which has only {} levels.", new Object[]{this.startLevel, ((AtlasMutationLevel)levels.get(0)).getCountryGroup(), levels.size()});
            }
            if (this.stopLevel < this.startLevel) {
                throw new CoreException("Debug Stop Level specified to {} cannot be smaller than the Debug Skip Level {}.", new Object[]{this.stopLevel, this.startLevel});
            }
            JavaPairRDD<CountryShard, PackedAtlas> currentAtlasRDD = null;
            for (int index = this.startLevel; index <= Math.min(levels.size() - 1, this.stopLevel); ++index) {
                Time startLocal = Time.now();
                AtlasMutationLevel level = (AtlasMutationLevel)levels.get(index);
                if (logger.isInfoEnabled()) {
                    logger.info(LEVEL_MESSAGE.replace("{}", "Starting Level {}"), (Object)level);
                }
                currentAtlasRDD = this.runMutationLevel(level, shards, currentAtlasRDD, geoJsonPool);
                if (!logger.isInfoEnabled()) continue;
                logger.info(LEVEL_MESSAGE.replace("{}", "Finished Level {} in {}"), (Object)level, (Object)startLocal.elapsedSince());
            }
            logger.info("Finished Spark Job for country {} in {}", (Object)((AtlasMutationLevel)levels.get(0)).getCountryGroup(), (Object)start.elapsedSince());
            return true;
        };
    }

    private JavaPairRDD<CountryShard, PackedAtlas> runMutationLevel(AtlasMutationLevel level, List<CountryShard> shards, JavaPairRDD<CountryShard, PackedAtlas> parentAtlasRDD, Pool geoJsonPool) {
        JavaPairRDD unChangedAtlasRDD;
        JavaPairRDD changedAtlasRDD;
        JavaPairRDD shardFeatureChangesRDD;
        if (shards.isEmpty()) {
            logger.warn("{}: No shards to process!", (Object)level);
            return this.getContext().parallelizePairs(new ArrayList(), 0);
        }
        JavaRDD shardsRDD = null;
        int size = shards.size();
        JavaPairRDD sourceAtlasRDD = null;
        if (parentAtlasRDD != null) {
            logger.info("{} using AtlasRDD from previous level directly.", (Object)level);
            sourceAtlasRDD = parentAtlasRDD;
        } else if (level.canPreloadAtlasRDD()) {
            sourceAtlasRDD = this.getContext().parallelize(shards, size).flatMapToPair(AtlasMutatorHelper.shardToAtlas(level));
            sourceAtlasRDD.setName(level + ": sourceAtlasRDD");
            sourceAtlasRDD.persist(STORAGE_LEVEL);
        } else {
            shardsRDD = this.getContext().parallelize(shards, size);
            shardsRDD.setName(level + ": shardsRDD");
            shardsRDD.persist(STORAGE_LEVEL);
        }
        if (sourceAtlasRDD != null) {
            JavaPairRDD<CountryShard, Map<CountryShard, PackedAtlas>> atlasGroupsRDDGeneration = AtlasMutatorDriver.getAtlasGroupsRDD(level.toString() + ": Generate", size, (JavaPairRDD<CountryShard, PackedAtlas>)sourceAtlasRDD, level.getGenerationShardExplorer(), level.getGenerationInputDependencyToRequest());
            atlasGroupsRDDGeneration.setName(level + ": atlasGroupsRDDGeneration");
            shardFeatureChangesRDD = atlasGroupsRDDGeneration.flatMapToPair(AtlasMutatorHelper.shardToAtlasMapToFeatureChanges(level));
        } else {
            shardFeatureChangesRDD = shardsRDD.flatMapToPair(AtlasMutatorHelper.shardToFeatureChanges(level));
        }
        shardFeatureChangesRDD.setName(level + ": shardFeatureChangesRDD");
        if (this.shouldSaveGenerated()) {
            shardFeatureChangesRDD.persist(STORAGE_LEVEL);
        }
        JavaPairRDD shardAssignedFeatureChangesRDD = shardFeatureChangesRDD.flatMapToPair(AtlasMutatorHelper.shardFeatureChangesToAssignedShardFeatureChanges(level)).reduceByKey(AtlasMutatorHelper.assignedToConcatenatedFeatureChanges(level));
        shardAssignedFeatureChangesRDD.setName(level + ": shardAssignedFeatureChangesRDD");
        if (this.shouldSaveAssigned()) {
            shardAssignedFeatureChangesRDD.persist(STORAGE_LEVEL);
        }
        JavaPairRDD shardAppliedFeatureChangesRDD = shardAssignedFeatureChangesRDD.mapToPair(AtlasMutatorHelper.assignedToShardAppliedFeatureChanges(level));
        shardAppliedFeatureChangesRDD.setName(level + ": shardAppliedFeatureChangesRDD");
        if (this.appliedRDDPersistUseSer) {
            shardAppliedFeatureChangesRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
        } else {
            shardAppliedFeatureChangesRDD.persist(StorageLevel.MEMORY_AND_DISK());
        }
        String descriptionMerging = level + ": FC Generate, Assign, Merge";
        this.getContext().setJobGroup(level.toString(), descriptionMerging);
        Set<CountryShard> shardsForApplication = this.shardsFrom(shardAppliedFeatureChangesRDD);
        logger.info("{}: Found {} shards with feature changes to apply to.", (Object)level, (Object)shardsForApplication.size());
        JavaPairRDD shardAppliedRepartitionedFeatureChangesRDD = shardAppliedFeatureChangesRDD.repartitionAndSortWithinPartitions(AtlasMutatorParameters.shardPartitioner(shardsForApplication), AtlasMutatorParameters.shardComparator());
        shardAppliedRepartitionedFeatureChangesRDD.setName(level + ": shardAppliedRepartitionedFeatureChangesRDD");
        int partitionSize = shardsForApplication.size();
        JavaPairRDD shardAppliedCoalescedFeatureChangesRDD = partitionSize == 0 ? shardAppliedRepartitionedFeatureChangesRDD : shardAppliedRepartitionedFeatureChangesRDD.coalesce(partitionSize);
        shardAppliedCoalescedFeatureChangesRDD.setName(level + ": shardAppliedCoalescedFeatureChangesRDD");
        JavaPairRDD shardChangeRDD = shardAppliedCoalescedFeatureChangesRDD.mapToPair(AtlasMutatorHelper.featureChangeListToChange(level));
        shardChangeRDD.setName(level + ": shardChangeRDD");
        if (sourceAtlasRDD != null) {
            JavaPairRDD<CountryShard, Map<CountryShard, PackedAtlas>> atlasGroupsRDDApplication = AtlasMutatorDriver.getAtlasGroupsRDD(level.toString() + ": Apply", size, (JavaPairRDD<CountryShard, PackedAtlas>)sourceAtlasRDD, level.getApplicationShardExplorer(), level.getApplicationInputDependencyToRequest());
            atlasGroupsRDDApplication.setName(level + ": atlasGroupsRDDApplication");
            JavaPairRDD changeAndShardToAtlasMapRDD = shardChangeRDD.leftOuterJoin(atlasGroupsRDDApplication);
            changeAndShardToAtlasMapRDD.setName(level + ": changeAndShardToAtlasMapRDD");
            changedAtlasRDD = changeAndShardToAtlasMapRDD.flatMapToPair(AtlasMutatorHelper.changeAndShardToAtlasMapToAtlas(level));
        } else {
            changedAtlasRDD = shardChangeRDD.flatMapToPair(AtlasMutatorHelper.changeToAtlas(level));
        }
        changedAtlasRDD.setName(level + ": changedAtlasRDD");
        changedAtlasRDD.persist(STORAGE_LEVEL);
        String descriptionChangeAtlas = level + ": Build ChangeAtlas";
        this.getContext().setJobGroup(level.toString(), descriptionChangeAtlas);
        Set<CountryShard> populatedShards = this.shardsFrom(changedAtlasRDD);
        shardAppliedFeatureChangesRDD.unpersist(false);
        if (sourceAtlasRDD != null) {
            JavaPairRDD<CountryShard, Map<CountryShard, PackedAtlas>> atlasGroupsRDDBackfill = AtlasMutatorDriver.getAtlasGroupsRDDSimple(level.toString() + ": Backfill", (JavaPairRDD<CountryShard, PackedAtlas>)sourceAtlasRDD);
            atlasGroupsRDDBackfill.setName(level + ": atlasGroupsRDDBackfill");
            unChangedAtlasRDD = atlasGroupsRDDBackfill.filter((Function & Serializable)tuple -> !populatedShards.contains(tuple._1())).flatMapToPair(AtlasMutatorHelper.untouchedShardAndMapToPotentialSourcePackedAtlas(level));
        } else {
            unChangedAtlasRDD = shardsRDD.filter((Function & Serializable)shard -> !populatedShards.contains(shard)).flatMapToPair(AtlasMutatorHelper.untouchedShardToPotentialSourcePackedAtlas(level));
        }
        unChangedAtlasRDD.setName(level + ": unChangedAtlasRDD");
        JavaPairRDD resultAtlasRDD = changedAtlasRDD.mapToPair((PairFunction & Serializable)tuple -> new Tuple2((Object)((CountryShard)tuple._1()), (Object)((PackedAtlas)((Tuple)tuple._2()).getFirst()))).union(unChangedAtlasRDD).coalesce(size);
        resultAtlasRDD.setName(level + ": resultAtlasRDD");
        if (level.isChildNeedsRDDInput() || this.shouldSaveInputDependency(level)) {
            resultAtlasRDD.persist(STORAGE_LEVEL);
        }
        this.saveFeatureChangesMaybe(level, LOG_GENERATED, (JavaPairRDD<CountryShard, List<FeatureChange>>)shardFeatureChangesRDD, geoJsonPool);
        this.saveFeatureChangesMaybe(level, LOG_ASSIGNED, (JavaPairRDD<CountryShard, List<FeatureChange>>)shardAssignedFeatureChangesRDD, geoJsonPool);
        this.saveFeatureChangesAppliedMaybe(level, (JavaPairRDD<CountryShard, Tuple<PackedAtlas, Change>>)changedAtlasRDD, geoJsonPool);
        if (level.isChildNeedsRDDInput()) {
            resultAtlasRDD.count();
        } else {
            this.saveAsHadoopAtlas((JavaPairRDD<CountryShard, PackedAtlas>)resultAtlasRDD, level);
        }
        if (!level.isChildNeedsRDDInput() && !level.isChildCanPreloadRDDInput()) {
            for (InputDependency inputDependency : level.getInputDependenciesToProvide()) {
                this.saveAsHadoopAtlas((JavaPairRDD<CountryShard, PackedAtlas>)resultAtlasRDD, level, Optional.of(inputDependency));
            }
        }
        shardFeatureChangesRDD.unpersist(false);
        shardAssignedFeatureChangesRDD.unpersist(false);
        changedAtlasRDD.unpersist(false);
        if (sourceAtlasRDD != null) {
            sourceAtlasRDD.unpersist(false);
        } else {
            shardsRDD.unpersist(false);
        }
        if (level.isChildNeedsRDDInput()) {
            return resultAtlasRDD;
        }
        resultAtlasRDD.unpersist(false);
        return null;
    }

    private void saveAsHadoopAtlas(JavaPairRDD<CountryShard, PackedAtlas> atlasRDD, AtlasMutationLevel level) {
        this.saveAsHadoopAtlas(atlasRDD, level, Optional.empty());
    }

    private void saveAsHadoopAtlas(JavaPairRDD<CountryShard, PackedAtlas> atlasRDD, AtlasMutationLevel level, Optional<InputDependency> inputDependencyOption) {
        String outputAtlasPath;
        String description;
        JavaPairRDD finalAtlasRDD;
        if (inputDependencyOption.isPresent()) {
            InputDependency inputDependency = inputDependencyOption.get();
            finalAtlasRDD = atlasRDD.flatMapToPair(AtlasMutatorHelper.inputDependencyFilteredAtlas(level, inputDependency));
            description = level + ": Save Filtered Mutated Atlas: " + inputDependency;
            outputAtlasPath = level.getOutputAtlasPath(inputDependency);
        } else {
            finalAtlasRDD = atlasRDD;
            description = level + ": Save Mutated Atlas";
            outputAtlasPath = level.getOutputAtlasPath();
        }
        this.getContext().setJobGroup(level.toString(), description);
        this.saveAsHadoopAtlas(AtlasMutatorHelper.embedCountryNameInKey("", finalAtlasRDD), outputAtlasPath);
    }

    private void saveAsHadoopAtlas(JavaPairRDD<String, PackedAtlas> atlasWithCountryNameEmbeddedRDD, String outputAtlasPath) {
        atlasWithCountryNameEmbeddedRDD.saveAsHadoopFile(outputAtlasPath, Text.class, Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(this.configuration()));
    }

    private void saveAsHadoopFeatureChanges(JavaPairRDD<String, List<FeatureChange>> featureChangesWithCountryNameEmbeddedRDD, String outputFeatureChangePath) {
        featureChangesWithCountryNameEmbeddedRDD.saveAsHadoopFile(outputFeatureChangePath, Text.class, Atlas.class, MultipleFeatureChangeOutputFormat.class, new JobConf(this.configuration()));
    }

    private void saveAsHadoopFeatureChanges(String type, JavaPairRDD<CountryShard, List<FeatureChange>> featureChangeRDD, AtlasMutationLevel level) {
        String descriptionLineDelimitedGeoJson = level + ": Save LDGeojson - " + type;
        this.getContext().setJobGroup(level.toString(), descriptionLineDelimitedGeoJson);
        this.saveAsHadoopFeatureChanges(AtlasMutatorHelper.embedCountryNameInKey(this.getLogFileName(level, type), featureChangeRDD), SparkFileHelper.combine(this.atlasMutatorConfiguration.getOutput(), LOG_FOLDER));
    }

    private void saveCountryAndLevels() {
        if (logger.isInfoEnabled()) {
            for (String countryKey : this.atlasMutatorConfiguration.getCountryToMutationLevels().keySet()) {
                logger.info("{}", (Object)this.atlasMutatorConfiguration.detailsString(countryKey));
            }
        }
        this.saveTextToFile(this.atlasMutatorConfiguration.detailsString(), this.outputFolder, COUNTRY_AND_LEVELS);
    }

    private void saveFeatureChangesAppliedMaybe(AtlasMutationLevel level, JavaPairRDD<CountryShard, Tuple<PackedAtlas, Change>> changedAtlasRDD, Pool geoJsonPool) {
        if (this.shouldSaveApplied()) {
            JavaPairRDD shardFeatureChangesRDD = changedAtlasRDD.flatMapToPair((PairFlatMapFunction & Serializable)tuple -> {
                ArrayList<Tuple2> result = new ArrayList<Tuple2>();
                CountryShard countryShard = (CountryShard)tuple._1();
                Change change = (Change)((Tuple)tuple._2()).getSecond();
                if (change != null) {
                    result.add(new Tuple2((Object)countryShard, (Object)change.getFeatureChanges()));
                }
                return result.iterator();
            });
            this.saveFeatureChangesMaybe(level, LOG_APPLIED, (JavaPairRDD<CountryShard, List<FeatureChange>>)shardFeatureChangesRDD, geoJsonPool);
        }
    }

    private void saveFeatureChangesMaybe(AtlasMutationLevel level, String type, JavaPairRDD<CountryShard, List<FeatureChange>> shardFeatureChangesRDD, Pool geoJsonPool) {
        if (LOG_GENERATED.equals(type) && this.shouldSaveGenerated() || LOG_ASSIGNED.equals(type) && this.shouldSaveAssigned() || LOG_APPLIED.equals(type) && this.shouldSaveApplied()) {
            geoJsonPool.queue(() -> this.saveAsHadoopFeatureChanges(type, shardFeatureChangesRDD, level));
        }
    }

    private void savePotentialLogFiles(List<AtlasMutationLevel> levels, List<CountryShard> countryShards) {
        if (this.debugFeatureChangeOutput == AtlasMutatorParameters.DebugFeatureChangeOutput.NONE) {
            return;
        }
        MultiMap countryToLogFiles = new MultiMap();
        for (AtlasMutationLevel level : levels) {
            String logFileName = this.getLogFileName(level, LOG_APPLIED);
            for (CountryShard countryShard : countryShards) {
                String logFileNameWithCountry = countryShard.getCountry() + logFileName;
                countryToLogFiles.add((Object)countryShard.getCountry(), (Object)SparkFileHelper.combine(LOG_FOLDER, logFileNameWithCountry, new CountryShard(logFileNameWithCountry, countryShard.getShard()).getName() + FeatureChangeOutputFormat.getTotalExtension()));
            }
        }
        countryToLogFiles.forEach((country, logFiles) -> this.saveTextToFile(new StringList(logFiles).join(System.lineSeparator()), this.outputFolder, POTENTIAL_LOG_FILES_PATH, country + POTENTIAL_LOG_FILES_NAME));
    }

    private void saveTextToFile(String contents, String location, String ... combinePaths) {
        String path = SparkFileHelper.combine(location, combinePaths);
        new Retry(5, Duration.ONE_SECOND).withQuadratic(true).run(() -> {
            FSDataOutputStream outputStream = null;
            try {
                outputStream = new FileSystemCreator().get(path, this.configurationMap()).create(new Path(path));
                OutputStreamWritableResource outputResource = new OutputStreamWritableResource((OutputStream)outputStream);
                if (path.endsWith(FileSuffix.GZIP.toString())) {
                    outputResource.setCompressor(Compressor.GZIP);
                }
                outputResource.writeAndClose(contents);
                if (outputStream == null) return;
            }
            catch (IOException e) {
                try {
                    throw new CoreException("Unable to write to {}", new Object[]{path, e});
                }
                catch (Throwable throwable) {
                    if (outputStream == null) throw throwable;
                    Streams.close(outputStream);
                    throw throwable;
                }
            }
            Streams.close((Closeable)outputStream);
            return;
        });
    }

    private <T> Set<CountryShard> shardsFrom(JavaPairRDD<CountryShard, T> shardBasedRdd) {
        return shardBasedRdd.map((Function & Serializable)tuple -> ((CountryShard)tuple._1()).getName()).distinct().collect().stream().map(CountryShard::forName).collect(Collectors.toSet());
    }

    private boolean shouldSaveApplied() {
        return AtlasMutatorParameters.DebugFeatureChangeOutput.NONE != this.debugFeatureChangeOutput;
    }

    private boolean shouldSaveAssigned() {
        return AtlasMutatorParameters.DebugFeatureChangeOutput.ALL == this.debugFeatureChangeOutput;
    }

    private boolean shouldSaveGenerated() {
        return AtlasMutatorParameters.DebugFeatureChangeOutput.ALL == this.debugFeatureChangeOutput;
    }

    private boolean shouldSaveInputDependency(AtlasMutationLevel level) {
        return !level.isChildCanPreloadRDDInput() && !level.getInputDependenciesToProvide().isEmpty();
    }
}

