/*
 * Decompiled with CFR 0.152.
 */
package net.sansa_stack.spark.io.rdf.loader;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import net.sansa_stack.hadoop.format.jena.base.FileInputFormatRdfBase;
import net.sansa_stack.hadoop.format.jena.trig.FileInputFormatRdfTrigQuad;
import net.sansa_stack.hadoop.format.jena.turtle.FileInputFormatRdfTurtleTriple;
import net.sansa_stack.hadoop.util.FileSplitUtils;
import net.sansa_stack.spark.io.rdf.input.impl.RdfSourceFactoryImpl;
import org.aksw.commons.util.concurrent.CompletionTracker;
import org.aksw.commons.util.concurrent.ExecutorServiceUtils;
import org.aksw.commons.util.ref.Ref;
import org.aksw.commons.util.ref.RefImpl;
import org.aksw.commons.util.ref.RefSupplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.jena.graph.Triple;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFFormat;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.sparql.core.Quad;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncRdfParserHadoop {
    private static final Logger logger = LoggerFactory.getLogger(AsyncRdfParserHadoop.class);

    public static void parse(Path file, RDFFormat rdfFormat, Configuration conf, StreamRDF sink) throws Exception {
        sink.start();
        FileSystem fileSystem = file.getFileSystem(conf);
        Lang lang = rdfFormat == null ? RdfSourceFactoryImpl.probeLang(file, fileSystem) : rdfFormat.getLang();
        if (RDFLanguages.isQuads((Lang)lang)) {
            Builder.forQuad().setConf(conf).setInputFile(file).setSink(sink).run();
        } else if (RDFLanguages.isTriples((Lang)lang)) {
            Builder.forTriple().setConf(conf).setInputFile(file).setSink(sink).run();
        } else {
            throw new RuntimeException("RDF language is neither quads nor triples " + lang);
        }
        sink.finish();
    }

    public static <T> void parseRaw(Path inputFile, Configuration conf, InputFormat<?, T> inputFormat, ExecutorService executorService, StreamRDF sink, BiConsumer<T, StreamRDF> sendRecordToStreamRDF) throws IOException, InterruptedException, ExecutionException {
        long splitSize;
        FileSystem fileSystem = inputFile.getFileSystem(conf);
        FileStatus fileStatus = fileSystem.getFileStatus(inputFile);
        long fileTotalLength = fileStatus.getLen();
        int numCores = Runtime.getRuntime().availableProcessors();
        long l = splitSize = numCores <= 0 ? 0L : fileTotalLength / (long)numCores;
        if (splitSize < 1000000L) {
            splitSize = 1000000L;
        }
        conf.set("mapreduce.input.fileinputformat.split.maxsize", Long.toString(splitSize));
        Job job = Job.getInstance((Configuration)conf);
        FileInputFormat.addInputPath((Job)job, (Path)inputFile);
        List splits = inputFormat.getSplits((JobContext)job);
        logger.info(String.format("Created %d splits from %s", splits.size(), inputFile));
        if (!splits.isEmpty()) {
            Model prefixes = FileInputFormatRdfBase.getModel((Configuration)conf);
            prefixes.getNsPrefixMap().entrySet().forEach(e -> sink.prefix((String)e.getKey(), (String)e.getValue()));
            CompletionTracker completionTracker = CompletionTracker.from((Executor)executorService);
            for (InputSplit split : splits) {
                completionTracker.execute(() -> {
                    try (Stream<Integer> stream = FileSplitUtils.createFlow((Job)job, (InputFormat)inputFormat, (InputSplit)split).map(record -> {
                        sendRecordToStreamRDF.accept(record, sink);
                        return 0;
                    });){
                        stream.count();
                    }
                });
            }
            completionTracker.shutdown();
            completionTracker.awaitTermination();
        }
    }

    public static class Builder<T>
    implements Cloneable {
        protected Configuration conf;
        protected Path inputFile;
        protected InputFormat<?, T> inputFormat;
        protected RefSupplier<ExecutorService> executorServiceRef;
        protected StreamRDF sink;
        protected BiConsumer<T, StreamRDF> sendRecordToStreamRDF;

        public Builder(Configuration conf, Path inputFile, InputFormat<?, T> inputFormat, RefSupplier<ExecutorService> executorServiceRef, StreamRDF sink, BiConsumer<T, StreamRDF> sendRecordToStreamRDF) {
            this.conf = conf;
            this.inputFile = inputFile;
            this.inputFormat = inputFormat;
            this.executorServiceRef = executorServiceRef;
            this.sink = sink;
            this.sendRecordToStreamRDF = sendRecordToStreamRDF;
        }

        public Builder<T> clone() {
            return new Builder<T>(this.conf, this.inputFile, this.inputFormat, this.executorServiceRef, this.sink, this.sendRecordToStreamRDF);
        }

        public Builder<T> setConf(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public Builder<T> setInputFile(Path inputFile) {
            this.inputFile = inputFile;
            return this;
        }

        public Builder<T> setSink(StreamRDF sink) {
            this.sink = sink;
            return this;
        }

        public Builder<T> applyDefaults() {
            if (this.executorServiceRef == null) {
                this.executorServiceRef = (RefSupplier & Serializable)() -> RefImpl.create2((Object)ExecutorServiceUtils.newBlockingThreadPoolExecutor(), null, ExecutorService::shutdownNow);
            }
            return this;
        }

        public static <X> Builder<X> create(InputFormat<?, X> inputFormat, BiConsumer<X, StreamRDF> sendRecordToStreamRDF) {
            return new Builder<X>(null, null, inputFormat, null, null, sendRecordToStreamRDF);
        }

        public static Builder<Triple> forTriple() {
            return Builder.create(new FileInputFormatRdfTurtleTriple(), (t, s) -> s.triple(t));
        }

        public static Builder<Quad> forQuad() {
            return Builder.create(new FileInputFormatRdfTrigQuad(), (q, s) -> s.quad(q));
        }

        public void run() throws Exception {
            Object clone = this.clone();
            ((Builder)clone).applyDefaults();
            ((Builder)clone).runActual();
        }

        protected void runActual() throws Exception {
            try (Ref ref = (Ref)this.executorServiceRef.get();){
                AsyncRdfParserHadoop.parseRaw(this.inputFile, this.conf, this.inputFormat, (ExecutorService)ref.get(), this.sink, this.sendRecordToStreamRDF);
            }
        }

        public InputFormat<?, T> getInputFormat() {
            return this.inputFormat;
        }
    }
}

