/*
 * Decompiled with CFR 0.152.
 */
package io.warp10.hadoop;

import com.fasterxml.sort.DataWriter;
import com.fasterxml.sort.SortConfig;
import com.fasterxml.sort.std.RawTextLineWriter;
import com.fasterxml.sort.std.TextFileSorter;
import io.warp10.WarpURLEncoder;
import io.warp10.continuum.TextFileShuffler;
import io.warp10.hadoop.Warp10InputSplit;
import io.warp10.hadoop.Warp10RecordReader;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Warp10InputFormat
extends InputFormat<Text, BytesWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(Warp10InputFormat.class);
    public static final String PROPERTY_WARP10_INPUTFORMAT_SUFFIX = "warp10.inputformat.suffix";
    public static final String PROPERTY_WARP10_SPLITS_ENDPOINT = "warp10.splits.endpoint";
    public static final String PROPERTY_WARP10_FETCHER_FALLBACKS = "warp10.fetcher.fallbacks";
    public static final String PROPERTY_WARP10_FETCHER_FALLBACKSONLY = "warp10.fetcher.fallbacksonly";
    public static final String PROPERTY_WARP10_FETCHER_PROTOCOL = "warp10.fetcher.protocol";
    public static final String DEFAULT_WARP10_FETCHER_PROTOCOL = "http";
    public static final String PROPERTY_WARP10_FETCHER_PORT = "warp10.fetcher.port";
    public static final String DEFAULT_WARP10_FETCHER_PORT = "8881";
    public static final String PROPERTY_WARP10_FETCHER_PATH = "warp10.fetcher.path";
    public static final String DEFAULT_WARP10_FETCHER_PATH = "/api/v0/sfetch";
    public static final String PROPERTY_WARP10_SPLITS_SELECTOR = "warp10.splits.selector";
    public static final String PROPERTY_WARP10_SPLITS_TOKEN = "warp10.splits.token";
    public static final String PROPERTY_WARP10_SPLITS_QUIETAFTER = "warp10.splits.quietafter";
    public static final String PROPERTY_WARP10_SPLITS_ACTIVEAFTER = "warp10.splits.activeafter";
    public static final String PROPERTY_WARP10_SPLITS_GSKIP = "warp10.splits.gskip";
    public static final String PROPERTY_WARP10_SPLITS_GCOUNT = "warp10.splits.gcount";
    public static final String PROPERTY_WARP10_HTTP_CONNECT_TIMEOUT = "warp10.http.connect.timeout";
    public static final String DEFAULT_WARP10_HTTP_CONNECT_TIMEOUT = "10000";
    public static final String PROPERTY_WARP10_HTTP_READ_TIMEOUT = "warp10.http.read.timeout";
    public static final String DEFAULT_WARP10_HTTP_READ_TIMEOUT = "10000";
    public static final String PROPERTY_WARP10_FETCH_NOW = "warp10.fetch.now";
    public static final String PROPERTY_WARP10_FETCH_TIMESPAN = "warp10.fetch.timespan";
    public static final String PROPERTY_WARP10_FETCH_START = "warp10.fetch.start";
    public static final String PROPERTY_WARP10_FETCH_STOP = "warp10.fetch.stop";
    public static final String PROPERTY_WARP10_FETCH_END = "warp10.fetch.end";
    public static final String PROPERTY_WARP10_FETCH_COUNT = "warp10.fetch.count";
    public static final String PROPERTY_WARP10_FETCH_DEDUP = "warp10.fetch.dedup";
    public static final String PROPERTY_WARP10_FETCH_SKIP = "warp10.fetch.skip";
    public static final String PROPERTY_WARP10_FETCH_SAMPLE = "warp10.fetch.sample";
    public static final String PROPERTY_WARP10_FETCH_STEP = "warp10.fetch.step";
    public static final String PROPERTY_WARP10_FETCH_TIMESTEP = "warp10.fetch.timestep";
    public static final String PROPERTY_WARP10_FETCH_PREBOUNDARY = "warp10.fetch.preboundary";
    public static final String PROPERTY_WARP10_FETCH_POSTBOUNDARY = "warp10.fetch.postboundary";
    public static final String PROPERTY_WARP10_MAX_COMBINED_SPLITS = "warp10.max.combined.splits";
    public static final String PROPERTY_WARP10_MAX_SPLITS = "warp10.max.splits";
    private String suffix = "";
    public static final String HTTP_HEADER_NOW = "X-Warp10-Now";
    public static final String HTTP_HEADER_TIMESPAN = "X-Warp10-Timespan";
    public static final String HTTP_HEADER_START = "X-Warp10-Start";
    public static final String HTTP_HEADER_STOP = "X-Warp10-Stop";
    public static final String HTTP_HEADER_END = "X-Warp10-End";
    public static final String HTTP_HEADER_DEDUP = "X-Warp10-Dedup";
    public static final String HTTP_HEADER_COUNT = "X-Warp10-Count";
    public static final String HTTP_HEADER_SKIP = "X-Warp10-Skip";
    public static final String HTTP_HEADER_SAMPLE = "X-Warp10-Sample";
    public static final String HTTP_HEADER_STEP = "X-Warp10-Step";
    public static final String HTTP_HEADER_PREBOUNDARY = "X-Warp10-Preboundary";
    public static final String HTTP_HEADER_POSTBOUNDARY = "X-Warp10-Postboundary";
    public static final String HTTP_HEADER_TIMESTEP = "X-Warp10-Timestep";
    public static final String HTTP_HEADER_SHOW_ERRORS = "X-Warp10-ShowErrors";

    public Warp10InputFormat(String suffix) {
        this.suffix = null != suffix ? "." + suffix : "";
    }

    public Warp10InputFormat() {
        this.suffix = "";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<InputSplit> getSplits(JobContext context) throws IOException {
        String line;
        int maxcombined;
        String line2;
        String sfx = this.getProperty(context, PROPERTY_WARP10_INPUTFORMAT_SUFFIX);
        if (null != sfx) {
            this.suffix = !"".equals(sfx) ? "." + sfx : "";
        }
        ArrayList<String> fallbacks = new ArrayList<String>();
        boolean fallbacksonly = "true".equals(this.getProperty(context, PROPERTY_WARP10_FETCHER_FALLBACKSONLY));
        if (null != this.getProperty(context, PROPERTY_WARP10_FETCHER_FALLBACKS)) {
            String[] servers;
            for (String server : servers = this.getProperty(context, PROPERTY_WARP10_FETCHER_FALLBACKS).split(",")) {
                fallbacks.add(server);
            }
        }
        int connectTimeout = Integer.valueOf(this.getProperty(context, PROPERTY_WARP10_HTTP_CONNECT_TIMEOUT, "10000"));
        int readTimeout = Integer.valueOf(this.getProperty(context, PROPERTY_WARP10_HTTP_READ_TIMEOUT, "10000"));
        String splitEndpoint = this.getProperty(context, PROPERTY_WARP10_SPLITS_ENDPOINT);
        StringBuilder sb = new StringBuilder();
        sb.append(splitEndpoint);
        sb.append("?");
        sb.append("selector");
        sb.append("=");
        sb.append(WarpURLEncoder.encode(this.getProperty(context, PROPERTY_WARP10_SPLITS_SELECTOR), StandardCharsets.UTF_8));
        sb.append("&");
        sb.append("token");
        sb.append("=");
        sb.append(this.getProperty(context, PROPERTY_WARP10_SPLITS_TOKEN));
        if (null != this.getProperty(context, PROPERTY_WARP10_SPLITS_ACTIVEAFTER)) {
            sb.append("&");
            sb.append("activeafter");
            sb.append("=");
            sb.append(this.getProperty(context, PROPERTY_WARP10_SPLITS_ACTIVEAFTER));
        }
        if (null != this.getProperty(context, PROPERTY_WARP10_SPLITS_QUIETAFTER)) {
            sb.append("&");
            sb.append("quietafter");
            sb.append("=");
            sb.append(this.getProperty(context, PROPERTY_WARP10_SPLITS_QUIETAFTER));
        }
        if (null != this.getProperty(context, PROPERTY_WARP10_SPLITS_GSKIP)) {
            sb.append("&");
            sb.append("gskip");
            sb.append("=");
            sb.append(this.getProperty(context, PROPERTY_WARP10_SPLITS_GSKIP));
        }
        if (null != this.getProperty(context, PROPERTY_WARP10_SPLITS_GCOUNT)) {
            sb.append("&");
            sb.append("gcount");
            sb.append("=");
            sb.append(this.getProperty(context, PROPERTY_WARP10_SPLITS_GCOUNT));
        }
        URL url = new URL(sb.toString());
        LOG.info("Get splits from: " + splitEndpoint);
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setConnectTimeout(connectTimeout);
        conn.setReadTimeout(readTimeout);
        conn.setDoInput(true);
        InputStream in = conn.getInputStream();
        File infile = File.createTempFile("Warp10InputFormat-", "-in");
        infile.deleteOnExit();
        FileOutputStream out = new FileOutputStream(infile);
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        PrintWriter pw = new PrintWriter(out);
        int count = 0;
        HashMap<String, AtomicInteger> perServer = new HashMap<String, AtomicInteger>();
        while (null != (line2 = br.readLine())) {
            ++count;
            String server = line2.substring(0, line2.indexOf(32));
            AtomicInteger scount = (AtomicInteger)perServer.get(server);
            if (null == scount) {
                scount = new AtomicInteger(0);
                perServer.put(server, scount);
            }
            scount.addAndGet(1);
            pw.println(line2);
        }
        pw.flush();
        ((OutputStream)out).close();
        br.close();
        in.close();
        conn.disconnect();
        TextFileSorter sorter = new TextFileSorter(new SortConfig().withMaxMemoryUsage(64000000L));
        File outfile = File.createTempFile("Warp10InputFormat-", "-out");
        outfile.deleteOnExit();
        in = new FileInputStream(infile);
        out = new FileOutputStream(outfile);
        try {
            sorter.sort(new TextFileShuffler.CustomReader(in), (DataWriter)new RawTextLineWriter((OutputStream)out));
        }
        finally {
            ((OutputStream)out).close();
            in.close();
            sorter.close();
            infile.delete();
        }
        int avgsplitcount = (int)Math.ceil((double)count / (double)perServer.size());
        if (null != this.getProperty(context, PROPERTY_WARP10_MAX_SPLITS)) {
            int maxsplitavg;
            avgsplitcount = maxsplitavg = (int)Math.ceil((double)count / (double)Integer.parseInt(this.getProperty(context, PROPERTY_WARP10_MAX_SPLITS)));
        }
        if (null != this.getProperty(context, PROPERTY_WARP10_MAX_COMBINED_SPLITS) && (maxcombined = Integer.parseInt(this.getProperty(context, PROPERTY_WARP10_MAX_COMBINED_SPLITS))) < avgsplitcount) {
            avgsplitcount = maxcombined;
        }
        ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
        br = new BufferedReader(new FileReader(outfile));
        Warp10InputSplit split = new Warp10InputSplit();
        String lastserver = null;
        int subsplits = 0;
        while (null != (line = br.readLine())) {
            String[] tokens = line.split("\\s+");
            if (null != lastserver && !lastserver.equals(tokens[0]) || avgsplitcount == subsplits) {
                Collections.shuffle(fallbacks);
                for (String fallback : fallbacks) {
                    split.addFetcher(fallback);
                }
                splits.add(split.build());
                split = new Warp10InputSplit();
                subsplits = 0;
            }
            ++subsplits;
            lastserver = tokens[0];
            split.addEntry(fallbacksonly ? null : tokens[0], tokens[2]);
        }
        br.close();
        outfile.delete();
        if (subsplits > 0) {
            Collections.shuffle(fallbacks);
            for (String fallback : fallbacks) {
                split.addFetcher(fallback);
            }
            splits.add(split.build());
        }
        LOG.info("Number of splits: " + splits.size());
        return splits;
    }

    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
        if (!(split instanceof Warp10InputSplit)) {
            throw new IOException("Invalid split type.");
        }
        return new Warp10RecordReader(this.suffix);
    }

    private String getProperty(JobContext context, String property) {
        return this.getProperty(context, property, null);
    }

    private String getProperty(JobContext context, String property, String defaultValue) {
        return Warp10InputFormat.getProperty(context.getConfiguration(), this.suffix, property, defaultValue);
    }

    public static String getProperty(Configuration conf, String suffix, String property, String defaultValue) {
        if (null != conf.get(property + suffix)) {
            return conf.get(property + suffix);
        }
        if (null != conf.get(property)) {
            return conf.get(property);
        }
        if (null != defaultValue) {
            return defaultValue;
        }
        return null;
    }
}

