/*
 * Decompiled with CFR 0.152.
 */
package org.openstreetmap.atlas.checks.distributed;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.openstreetmap.atlas.checks.base.Check;
import org.openstreetmap.atlas.checks.base.CheckResourceLoader;
import org.openstreetmap.atlas.checks.configuration.ConfigurationResolver;
import org.openstreetmap.atlas.checks.distributed.AtlasFilePathResolver;
import org.openstreetmap.atlas.checks.distributed.IntegrityChecksCommandArguments;
import org.openstreetmap.atlas.checks.distributed.RunnableCheck;
import org.openstreetmap.atlas.checks.distributed.ShardedCheckFlagsTask;
import org.openstreetmap.atlas.checks.event.CheckFlagEvent;
import org.openstreetmap.atlas.checks.event.CheckFlagFileProcessor;
import org.openstreetmap.atlas.checks.event.CheckFlagGeoJsonProcessor;
import org.openstreetmap.atlas.checks.event.CheckFlagTippecanoeProcessor;
import org.openstreetmap.atlas.checks.event.MetricFileGenerator;
import org.openstreetmap.atlas.checks.utility.UniqueCheckFlagContainer;
import org.openstreetmap.atlas.event.EventService;
import org.openstreetmap.atlas.event.Processor;
import org.openstreetmap.atlas.event.ShutdownEvent;
import org.openstreetmap.atlas.exception.CoreException;
import org.openstreetmap.atlas.generator.sharding.AtlasSharding;
import org.openstreetmap.atlas.generator.tools.caching.HadoopAtlasFileCache;
import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper;
import org.openstreetmap.atlas.geography.GeometricSurface;
import org.openstreetmap.atlas.geography.Polygon;
import org.openstreetmap.atlas.geography.atlas.Atlas;
import org.openstreetmap.atlas.geography.atlas.AtlasResourceLoader;
import org.openstreetmap.atlas.geography.atlas.dynamic.DynamicAtlas;
import org.openstreetmap.atlas.geography.atlas.dynamic.policy.DynamicAtlasPolicy;
import org.openstreetmap.atlas.geography.atlas.items.AtlasEntity;
import org.openstreetmap.atlas.geography.atlas.multi.MultiAtlas;
import org.openstreetmap.atlas.geography.sharding.Shard;
import org.openstreetmap.atlas.geography.sharding.Sharding;
import org.openstreetmap.atlas.streaming.resource.Resource;
import org.openstreetmap.atlas.utilities.collections.StringList;
import org.openstreetmap.atlas.utilities.configuration.Configuration;
import org.openstreetmap.atlas.utilities.configuration.MergedConfiguration;
import org.openstreetmap.atlas.utilities.configuration.StandardConfiguration;
import org.openstreetmap.atlas.utilities.conversion.StringConverter;
import org.openstreetmap.atlas.utilities.filters.AtlasEntityPolygonsFilter;
import org.openstreetmap.atlas.utilities.maps.MultiMap;
import org.openstreetmap.atlas.utilities.runtime.Command;
import org.openstreetmap.atlas.utilities.runtime.CommandMap;
import org.openstreetmap.atlas.utilities.scalars.Distance;
import org.openstreetmap.atlas.utilities.threads.Pool;
import org.openstreetmap.atlas.utilities.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class ShardedIntegrityChecksSparkJob
extends IntegrityChecksCommandArguments {
    private static final Command.Switch<Distance> EXPANSION_DISTANCE = new Command.Switch("shardBufferDistance", "Distance to expand the bounds of the shard group to create a network in kilometers", distanceString -> Distance.kilometers((double)Double.valueOf(distanceString)), Command.Optionality.OPTIONAL, "10.0");
    private static final String ATLAS_SHARDING_FILE = "sharding.txt";
    private static final Command.Switch<String> SHARDING = new Command.Switch("sharding", "Sharding to load in place of sharding file in Atlas path", StringConverter.IDENTITY, Command.Optionality.OPTIONAL);
    private static final Command.Switch<Boolean> MULTI_ATLAS = new Command.Switch("multiAtlas", "If true then use a multi atlas, else use a dynamic atlas. This works better for running on a single machine", Boolean::getBoolean, Command.Optionality.OPTIONAL, "false");
    private static final Logger logger = LoggerFactory.getLogger(ShardedIntegrityChecksSparkJob.class);
    private static final long serialVersionUID = -8038802870994470017L;
    private final MultiMap<String, Check> countryChecks = new MultiMap();

    public static void main(String[] args) {
        new ShardedIntegrityChecksSparkJob().run(args);
    }

    public String getName() {
        return "Sharded Integrity Checks Spark Job";
    }

    public void start(CommandMap commandMap) {
        Time start = Time.now();
        String atlasDirectory = (String)commandMap.get(ATLAS_FOLDER);
        String input = Optional.ofNullable(this.input(commandMap)).orElse(atlasDirectory);
        String output = this.output(commandMap);
        Set outputFormats = (Set)commandMap.get(OUTPUT_FORMATS);
        StringList countries = StringList.split((String)((String)commandMap.get(COUNTRIES)), (String)",");
        Optional checkFilter = commandMap.getOption(CHECK_FILTER);
        MergedConfiguration checksConfiguration = new MergedConfiguration(Stream.concat(Stream.of(ConfigurationResolver.loadConfiguration(commandMap, (Command.Switch<StringList>)CONFIGURATION_FILES, (Command.Switch<String>)CONFIGURATION_JSON)), Stream.of(checkFilter.map(whitelist -> new StandardConfiguration("WhiteListConfiguration", Collections.singletonMap("CheckResourceLoader.checks.whitelist", whitelist))).orElse(ConfigurationResolver.emptyConfiguration()))).collect(Collectors.toList()));
        Map sparkContext = this.configurationMap();
        AtlasFilePathResolver resolver = new AtlasFilePathResolver((Configuration)checksConfiguration);
        SparkFileHelper fileHelper = new SparkFileHelper(sparkContext);
        CheckResourceLoader checkLoader = new CheckResourceLoader((Configuration)checksConfiguration);
        Optional alternateShardingFile = commandMap.getOption(SHARDING);
        String shardingPathInAtlas = "dynamic@" + SparkFileHelper.combine((String)input, (String[])new String[]{ATLAS_SHARDING_FILE});
        String shardingFilePath = (String)((Object)alternateShardingFile.orElse(shardingPathInAtlas));
        Sharding sharding = AtlasSharding.forString((String)shardingFilePath, (Map)this.configurationMap());
        Broadcast shardingBroadcast = this.getContext().broadcast((Object)sharding);
        Distance distanceToLoadShards = (Distance)commandMap.get(EXPANSION_DISTANCE);
        if (countries.isEmpty()) {
            throw new CoreException("No countries found to run.");
        }
        for (String country : countries) {
            Set<Check> checksLoadedForCountry = checkLoader.loadChecksForCountry(country);
            if (checksLoadedForCountry.isEmpty()) {
                logger.warn("No checks loaded for country {}. Skipping execution", (Object)country);
                continue;
            }
            checksLoadedForCountry.forEach(check -> this.countryChecks.add((Object)country, check));
        }
        if (this.countryChecks.isEmpty()) {
            throw new CoreException("No checks loaded for any of the countries provided.");
        }
        MultiMap<String, Shard> countryShards = ShardedIntegrityChecksSparkJob.countryShardMapFromShardFiles(countries.stream().collect(Collectors.toSet()), resolver, input, sparkContext);
        if (countryShards.isEmpty()) {
            throw new CoreException("No atlas files found in input.");
        }
        if (!countries.stream().allMatch(arg_0 -> countryShards.containsKey(arg_0))) {
            Set missingCountries = countries.stream().filter(aCountry -> !countryShards.containsKey(aCountry)).collect(Collectors.toSet());
            throw new CoreException("Unable to find standardized named shard files in the path {}/<countryName> for the countries {}. \n Files must be in format <country>_<zoom>_<x>_<y>.atlas", new Object[]{input, missingCountries});
        }
        try (Pool checkPool = new Pool(countryShards.size(), "Countries Execution Pool");){
            for (Map.Entry countryShard : countryShards.entrySet()) {
                checkPool.queue(() -> {
                    List tasksForCountry = ((List)countryShard.getValue()).stream().map(shard -> new ShardedCheckFlagsTask((String)countryShard.getKey(), (Shard)shard, this.countryChecks.get(countryShard.getKey()))).collect(Collectors.toList());
                    this.getContext().setLocalProperty("callSite.short", String.format("Running checks on %s", ((ShardedCheckFlagsTask)tasksForCountry.get(0)).getCountry()));
                    this.getContext().parallelize(tasksForCountry, tasksForCountry.size()).mapToPair(this.produceFlags(input, output, this.configurationMap(), fileHelper, (Broadcast<Sharding>)shardingBroadcast, distanceToLoadShards, (Boolean)commandMap.get(MULTI_ATLAS))).reduceByKey(UniqueCheckFlagContainer::combine).foreach(this.processFlags(output, fileHelper, outputFormats));
                });
            }
        }
        logger.info("Sharded checks completed in {}", (Object)start.elapsedSince());
    }

    @Override
    protected Command.SwitchList switches() {
        return super.switches().with(new Command.Switch[]{EXPANSION_DISTANCE, MULTI_ATLAS, SHARDING});
    }

    private Function<Shard, Optional<Atlas>> atlasFetcher(String input, String country, Map<String, String> configuration) {
        HadoopAtlasFileCache cache = new HadoopAtlasFileCache(input, configuration);
        AtlasResourceLoader loader = new AtlasResourceLoader();
        return (Function<Shard, Optional> & Serializable & scala.Serializable)shard -> cache.get(country, shard).map(xva$0 -> loader.load(new Resource[]{xva$0}));
    }

    private VoidFunction<Tuple2<String, UniqueCheckFlagContainer>> processFlags(String output, SparkFileHelper fileHelper, Set<IntegrityChecksCommandArguments.OutputFormats> outputFormats) {
        return (VoidFunction & Serializable)tuple -> {
            String country = (String)tuple._1();
            UniqueCheckFlagContainer flagContainer = (UniqueCheckFlagContainer)tuple._2();
            EventService eventService = EventService.get((String)country);
            if (outputFormats.contains((Object)IntegrityChecksCommandArguments.OutputFormats.FLAGS)) {
                eventService.register((Processor)new CheckFlagFileProcessor(fileHelper, SparkFileHelper.combine((String)output, (String[])new String[]{"flag", country})));
            }
            if (outputFormats.contains((Object)IntegrityChecksCommandArguments.OutputFormats.GEOJSON)) {
                eventService.register((Processor)new CheckFlagGeoJsonProcessor(fileHelper, SparkFileHelper.combine((String)output, (String[])new String[]{"geojson", country})));
            }
            if (outputFormats.contains((Object)IntegrityChecksCommandArguments.OutputFormats.TIPPECANOE)) {
                eventService.register((Processor)new CheckFlagTippecanoeProcessor(fileHelper, SparkFileHelper.combine((String)output, (String[])new String[]{"tippecanoe", country})));
            }
            ((Stream)flagContainer.reconstructEvents().parallel()).forEach(arg_0 -> ((EventService)eventService).post(arg_0));
            eventService.complete();
        };
    }

    private PairFunction<ShardedCheckFlagsTask, String, UniqueCheckFlagContainer> produceFlags(String input, String output, Map<String, String> configurationMap, SparkFileHelper fileHelper, Broadcast<Sharding> sharding, Distance shardDistanceExpansion, boolean multiAtlas) {
        return (PairFunction & Serializable)task -> {
            MultiAtlas atlas;
            Function<Shard, Optional<Atlas>> fetcher = this.atlasFetcher(input, task.getCountry(), configurationMap);
            if (multiAtlas) {
                atlas = new MultiAtlas(StreamSupport.stream(((Sharding)sharding.getValue()).shards((GeometricSurface)task.getShard().bounds().expand(shardDistanceExpansion)).spliterator(), true).map(fetcher).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()));
            } else {
                DynamicAtlasPolicy policy = new DynamicAtlasPolicy(fetcher, (Sharding)sharding.getValue(), Collections.singleton(task.getShard()), (Polygon)task.getShard().bounds().expand(shardDistanceExpansion)).withDeferredLoading(true).withAggressivelyExploreRelations(true).withExtendIndefinitely(false);
                atlas = new DynamicAtlas(policy);
                ((DynamicAtlas)atlas).preemptiveLoad();
            }
            AtlasEntityPolygonsFilter boundaryFilter = AtlasEntityPolygonsFilter.Type.INCLUDE.polygons(Collections.singleton(task.getShard().bounds()));
            EventService eventService = task.getEventService();
            final UniqueCheckFlagContainer container = new UniqueCheckFlagContainer();
            eventService.register((Processor)new Processor<CheckFlagEvent>(){

                public void process(ShutdownEvent event) {
                }

                @Subscribe
                @AllowConcurrentEvents
                public void process(CheckFlagEvent event) {
                    container.add(event.getCheckName(), event.getCheckFlag().makeComplete());
                }
            });
            MetricFileGenerator metricFileGenerator = new MetricFileGenerator(task.getShard().getName() + "_check-run-time.csv", fileHelper, SparkFileHelper.combine((String)output, (String[])new String[]{"metric", task.getCountry()}));
            eventService.register((Processor)metricFileGenerator);
            try (Pool checkPool = new Pool(task.getChecks().size(), "Sharded Checks Execution Pool");){
                for (Check check : task.getChecks()) {
                    checkPool.queue((Runnable)new RunnableCheck(task.getCountry(), check, ShardedIntegrityChecksSparkJob.objectsToCheck((Atlas)atlas, check, (Predicate<AtlasEntity>)boundaryFilter), eventService));
                }
            }
            eventService.complete();
            return new Tuple2((Object)task.getCountry(), (Object)container);
        };
    }
}

