/*
 * Decompiled with CFR 0.152.
 */
package org.openstreetmap.atlas.generator.tools.spark.input;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.input.PortableDataStream;
import org.openstreetmap.atlas.exception.CoreException;
import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemCreator;
import org.openstreetmap.atlas.geography.atlas.Atlas;
import org.openstreetmap.atlas.geography.atlas.AtlasResourceLoader;
import org.openstreetmap.atlas.streaming.Streams;
import org.openstreetmap.atlas.streaming.compression.Decompressor;
import org.openstreetmap.atlas.streaming.resource.FileSuffix;
import org.openstreetmap.atlas.streaming.resource.InputStreamResource;
import org.openstreetmap.atlas.streaming.resource.Resource;
import org.openstreetmap.atlas.utilities.collections.Iterables;
import org.openstreetmap.atlas.utilities.collections.StringList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public final class SparkInput {
    private static FileSystemCreator FILE_SYSTEM_CREATOR;
    private static final Logger logger;
    private static final Predicate<String> HIDDEN_FILE_PREDICATE;

    public static JavaRDD<Atlas> atlasFiles(JavaSparkContext context, String path) {
        return SparkInput.transform(context, path, (BiFunction<Path, Map, Iterable> & Serializable)(elasticPath, map) -> {
            Resource resource = SparkInput.getResource(elasticPath, map);
            Atlas atlas = new AtlasResourceLoader().load(resource);
            ArrayList<Atlas> result = new ArrayList<Atlas>();
            if (atlas != null) {
                logger.info("Loading Atlas resource {}", (Object)resource.getName());
                atlas.area(0L);
                result.add(atlas);
            }
            return Iterables.stream(result).map(atlasResult -> new Tuple2((Object)0, atlasResult));
        }).values();
    }

    public static JavaPairRDD<String, PortableDataStream> binaryFile(JavaSparkContext context, String path) {
        return SparkInput.transform(context, path, (BiFunction<Path, Map, Iterable> & Serializable)(elasticPath, map) -> {
            long fileLength;
            TaskAttemptContextImpl taskAttemptContext = new TaskAttemptContextImpl(SparkInput.toHadoop(map), new TaskAttemptID());
            try {
                fileLength = SparkInput.getFileSystemCreator().get(elasticPath.toString(), (Map<String, String>)map).getFileStatus(elasticPath).getLen();
            }
            catch (IOException e) {
                throw new CoreException("Unable to get file length for {}", elasticPath.toString(), e);
            }
            CombineFileSplit split = new CombineFileSplit(new Path[]{elasticPath}, new long[]{fileLength});
            PortableDataStream result = new PortableDataStream(split, (TaskAttemptContext)taskAttemptContext, Integer.valueOf(0));
            return Iterables.from(new Tuple2((Object)elasticPath.toString(), (Object)result));
        }, otherPath -> context.binaryFiles(otherPath));
    }

    public static <K extends Writable, V extends Writable> JavaPairRDD<K, V> sequenceFile(JavaSparkContext context, String path, final Class<K> sequenceKeyClass, Class<V> sequenceValueClass) {
        return SparkInput.transform(context, path, (BiFunction<Path, Map, Iterable> & Serializable)(elasticPath, map) -> {
            try {
                return () -> new Iterator<Tuple2<K, V>>((Map)map, sequenceValueClass, (Path)elasticPath){
                    private Boolean hasNext;
                    private SequenceFile.Reader sequenceFileReader;
                    private final Writable key;
                    private final Writable value;
                    final /* synthetic */ Map val$map;
                    final /* synthetic */ Class val$sequenceValueClass;
                    final /* synthetic */ Path val$elasticPath;
                    {
                        this.val$map = map;
                        this.val$sequenceValueClass = clazz2;
                        this.val$elasticPath = path;
                        this.key = (Writable)ReflectionUtils.newInstance((Class)sequenceKeyClass, (Configuration)SparkInput.toHadoop(this.val$map));
                        this.value = (Writable)ReflectionUtils.newInstance((Class)this.val$sequenceValueClass, (Configuration)SparkInput.toHadoop(this.val$map));
                        this.hasNext = null;
                    }

                    @Override
                    public boolean hasNext() {
                        if (this.hasNext == null) {
                            SequenceFile.Reader.Option filePath = SequenceFile.Reader.file((Path)this.val$elasticPath);
                            try {
                                this.sequenceFileReader = new SequenceFile.Reader(SparkInput.toHadoop(this.val$map), new SequenceFile.Reader.Option[]{filePath});
                            }
                            catch (IOException e) {
                                throw new CoreException("Could not create sequence file reader.", e);
                            }
                            if (sequenceKeyClass != this.sequenceFileReader.getKeyClass()) {
                                Streams.close((Closeable)this.sequenceFileReader);
                                throw new CoreException("The sequence file's key is {} whereas the expected class is {}", this.sequenceFileReader.getKeyClass(), sequenceKeyClass);
                            }
                            if (this.val$sequenceValueClass != this.sequenceFileReader.getValueClass()) {
                                Streams.close((Closeable)this.sequenceFileReader);
                                throw new CoreException("The sequence file's value is {} whereas the expected class is {}", this.sequenceFileReader.getValueClass(), this.val$sequenceValueClass);
                            }
                            this.lookAhead();
                        }
                        return this.hasNext;
                    }

                    @Override
                    public Tuple2<K, V> next() {
                        if (this.hasNext()) {
                            Tuple2 result = new Tuple2((Object)this.key, (Object)this.value);
                            this.lookAhead();
                            return result;
                        }
                        return null;
                    }

                    private void lookAhead() {
                        try {
                            this.hasNext = this.sequenceFileReader.next(this.key, this.value);
                            if (!this.hasNext.booleanValue()) {
                                Streams.close((Closeable)this.sequenceFileReader);
                            }
                        }
                        catch (EOFException e) {
                            Streams.close((Closeable)this.sequenceFileReader);
                            this.hasNext = false;
                        }
                        catch (Exception e) {
                            Streams.close((Closeable)this.sequenceFileReader);
                            throw new CoreException("Unable to walk through sequence file", e);
                        }
                    }
                };
            }
            catch (Exception e) {
                throw new CoreException("Could not read Sequence File from {}", path, e);
            }
        }, otherPath -> context.sequenceFile(otherPath, sequenceKeyClass, sequenceValueClass));
    }

    public static JavaRDD<String> textFile(JavaSparkContext context, String path) {
        return SparkInput.transform(context, path, (BiFunction<Path, Map, Iterable> & Serializable)(elasticPath, map) -> {
            Resource resource = SparkInput.getResource(elasticPath, map);
            return Iterables.translate(resource.lines(), line -> new Tuple2((Object)0, line));
        }, otherPath -> context.textFile(otherPath).mapToPair((PairFunction & Serializable)value -> new Tuple2((Object)0, value))).values();
    }

    protected static void setFileSystemCreator(FileSystemCreator fileSystemCreator) {
        FILE_SYSTEM_CREATOR = fileSystemCreator;
    }

    private static Optional<FileSystem> elasticFileSystem(JavaSparkContext context, String path) {
        FileSystem fileSystem = SparkInput.fileSystem(context, path);
        return Optional.of(fileSystem);
    }

    private static FileSystem fileSystem(JavaSparkContext context, String path) {
        FileSystemCreator creator = SparkInput.getFileSystemCreator();
        return creator.get(path, context.getConf());
    }

    private static FileSystemCreator getFileSystemCreator() {
        if (FILE_SYSTEM_CREATOR == null) {
            FILE_SYSTEM_CREATOR = new FileSystemCreator();
        }
        return FILE_SYSTEM_CREATOR;
    }

    private static Resource getResource(Path elasticPath, Map<String, String> map) {
        InputStreamResource resource;
        try {
            resource = new InputStreamResource(() -> {
                try {
                    return SparkInput.getFileSystemCreator().get(elasticPath.toString(), map).open(elasticPath);
                }
                catch (Exception e) {
                    throw new CoreException("Unable to open stream {}", elasticPath, e);
                }
            }).withName(elasticPath.toString());
            if (resource.getName().endsWith(FileSuffix.GZIP.toString())) {
                resource.setDecompressor(Decompressor.GZIP);
            }
        }
        catch (Exception e) {
            throw new CoreException("Cannot read {}", elasticPath, e);
        }
        return resource;
    }

    private static Configuration toHadoop(Map<String, String> conf) {
        Configuration result = new Configuration();
        conf.forEach((key, value) -> result.set(key, value));
        return result;
    }

    private static Map<String, String> toMap(SparkConf conf) {
        HashMap<String, String> result = new HashMap<String, String>();
        for (Tuple2 tuple : conf.getAll()) {
            result.put((String)tuple._1(), (String)tuple._2());
        }
        return result;
    }

    private static SparkConf toSpark(Map<String, String> conf) {
        SparkConf result = new SparkConf();
        conf.forEach((key, value) -> result.set(key, value));
        return result;
    }

    private static <K, V> JavaPairRDD<K, V> transform(JavaSparkContext context, String path, BiFunction<Path, Map<String, String>, Iterable<Tuple2<K, V>>> elasticFunction) {
        return SparkInput.transform(context, path, elasticFunction, null);
    }

    private static <K, V> JavaPairRDD<K, V> transform(JavaSparkContext context, String path, BiFunction<Path, Map<String, String>, Iterable<Tuple2<K, V>>> elasticFunction, Function<String, JavaPairRDD<K, V>> defaultFunction) {
        Optional<FileSystem> fileSystemOption = SparkInput.elasticFileSystem(context, path);
        if (fileSystemOption.isPresent() || defaultFunction == null) {
            FileSystem fileSystem = fileSystemOption.isPresent() ? fileSystemOption.get() : SparkInput.fileSystem(context, path);
            ArrayList<String> files = new ArrayList<String>();
            try {
                FileStatus[] statuses;
                for (FileStatus status : statuses = fileSystem.listStatus(new Path(path))) {
                    if (!HIDDEN_FILE_PREDICATE.negate().test(status.getPath().getName())) continue;
                    files.add(status.getPath().toUri().toString());
                }
                JavaRDD paths = context.parallelize(files);
                Map<String, String> map = SparkInput.toMap(context.getConf());
                return paths.flatMapToPair((PairFlatMapFunction & Serializable)elasticPath -> ((Iterable)elasticFunction.apply(new Path(elasticPath), map)).iterator());
            }
            catch (Exception e) {
                throw new CoreException("Could not list blobs from the file system:\n{}", new StringList((List<String>)files).join("\n"), e);
            }
        }
        return defaultFunction.apply(path);
    }

    private SparkInput() {
    }

    static {
        logger = LoggerFactory.getLogger(SparkInput.class);
        HIDDEN_FILE_PREDICATE = name -> name.startsWith("_") || name.startsWith(".");
    }
}

