/*
 * Decompiled with CFR 0.152.
 */
package io.warp10.hadoop;

import io.warp10.hadoop.Warp10InputFormat;
import io.warp10.hadoop.WarpScriptInputFormat;
import io.warp10.hadoop.WritableUtils;
import io.warp10.script.WarpScriptException;
import io.warp10.script.WarpScriptExecutor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WarpScriptRecordReader
extends RecordReader<Object, Object> {
    private final RecordReader reader;
    private Object key = null;
    private Object value = null;
    private List<List<Object>> records = new ArrayList<List<Object>>();
    private int recordidx = 0;
    private final String suffix;
    private WarpScriptExecutor executor;
    private boolean done;
    private final WarpScriptInputFormat inputFormat;

    public WarpScriptRecordReader(WarpScriptInputFormat inputFormat) {
        this.inputFormat = inputFormat;
        this.suffix = inputFormat.getSuffix();
        this.reader = inputFormat.getWrappedRecordReader();
    }

    public void close() throws IOException {
        this.reader.close();
    }

    public Object getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    public Object getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    public float getProgress() throws IOException, InterruptedException {
        return this.reader.getProgress();
    }

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.reader.initialize(split, context);
        Configuration conf = context.getConfiguration();
        String code = Warp10InputFormat.getProperty(conf, this.suffix, "warpscript.inputformat.script", null);
        if (split instanceof FileSplit) {
            conf.set(".path", ((FileSplit)split).getPath().toString());
        }
        try {
            this.executor = this.inputFormat.getWarpScriptExecutor(conf, code);
        }
        catch (WarpScriptException wse) {
            throw new IOException("Error while instatiating WarpScript executor", wse);
        }
        this.done = false;
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        boolean nkv;
        if (!this.records.isEmpty()) {
            List<Object> kv = this.records.get(this.recordidx++);
            if (this.records.size() == this.recordidx) {
                this.records.clear();
                this.recordidx = 0;
            }
            this.key = kv.get(0);
            this.value = kv.get(1);
            return true;
        }
        if (this.done) {
            return false;
        }
        while (nkv = this.reader.nextKeyValue()) {
            Object k = this.reader.getCurrentKey();
            Object v = this.reader.getCurrentValue();
            ArrayList<Object> input = new ArrayList<Object>();
            input.add(this.done);
            input.add(WritableUtils.fromWritable(v));
            input.add(WritableUtils.fromWritable(k));
            try {
                List<Object> results = this.executor.exec(input);
                if (results.isEmpty()) continue;
                for (int i = results.size() - 1; i >= 0; --i) {
                    Object result = results.get(i);
                    if (!(result instanceof List) || 2 != ((List)result).size()) {
                        throw new IOException("Invalid WarpScript\u2122 output, expected a [ key value ] pair, got a " + result.getClass());
                    }
                    ArrayList<Writable> record = new ArrayList<Writable>();
                    record.add(WritableUtils.toWritable(((List)result).get(0)));
                    record.add(WritableUtils.toWritable(((List)result).get(1)));
                    this.records.add(record);
                }
                return this.nextKeyValue();
            }
            catch (WarpScriptException wse) {
                throw new IOException(wse);
            }
        }
        this.done = true;
        ArrayList<Object> input = new ArrayList<Object>();
        input.add(true);
        try {
            List<Object> results = this.executor.exec(input);
            if (results.isEmpty()) {
                return false;
            }
            for (int i = results.size() - 1; i >= 0; --i) {
                Object result = results.get(i);
                if (!(result instanceof List) || 2 != ((List)result).size()) {
                    throw new IOException("Invalid WarpScript\u2122 output, expected [ key value ] pairs, got a " + result.getClass());
                }
                ArrayList<Writable> record = new ArrayList<Writable>();
                record.add(WritableUtils.toWritable(((List)result).get(0)));
                record.add(WritableUtils.toWritable(((List)result).get(1)));
                this.records.add(record);
            }
            return this.nextKeyValue();
        }
        catch (WarpScriptException wse) {
            throw new IOException(wse);
        }
    }
}

