/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.arraymodel.functions;

import eu.stratosphere.api.common.functions.GenericReducer;
import eu.stratosphere.arraymodel.functions.AbstractArrayModelFunction;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.CopyableValue;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.InstantiationUtil;
import java.lang.reflect.Method;
import java.util.Iterator;

public abstract class ReduceWithKeyFunction
extends AbstractArrayModelFunction
implements GenericReducer<Value[], Value[]> {
    private OneHeadIterator iter;
    private Value keyVal;
    private int keyIndex;
    public static final String KEY_INDEX_PARAM_KEY = "reduceWithKey.key-pos";
    public static final String KEY_TYPE_PARAM_KEY = "reduceWithKey.key-type";

    public abstract void reduce(Value var1, Iterator<Value[]> var2, Collector<Value[]> var3);

    public void combine(Value key, Iterator<Value[]> records, Collector<Value[]> out) {
        this.reduce(key, records, out);
    }

    public final void reduce(Iterator<Value[]> records, Collector<Value[]> out) throws Exception {
        Value[] first = records.next();
        Value key = this.keyVal;
        ((CopyableValue)first[this.keyIndex]).copyTo((Object)key);
        this.iter.set(first, records);
        this.reduce(key, this.iter, out);
    }

    public final void combine(Iterator<Value[]> records, Collector<Value[]> out) throws Exception {
        Value[] first = records.next();
        Value key = this.keyVal;
        ((CopyableValue)first[this.keyIndex]).copyTo((Object)key);
        this.iter.set(first, records);
        this.combine(key, this.iter, out);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.keyIndex = parameters.getInteger(KEY_INDEX_PARAM_KEY, -1);
        if (this.keyIndex < 0) {
            throw new Exception("Invalid setup for ReduceWithKey: Key position has not been encoded in the config.");
        }
        Class<? extends Value>[] types = this.getDataTypes(0);
        if (types == null) {
            throw new Exception("Data types of the function's input could not be determined.");
        }
        if (this.keyIndex >= types.length) {
            throw new Exception("The specified position of the key is out of the range for the data types.");
        }
        this.keyVal = (Value)InstantiationUtil.instantiate(types[this.keyIndex], Value.class);
        if (!(this.keyVal instanceof CopyableValue)) {
            throw new Exception("Invalid setup for ReduceWithKey: Key type must implement " + CopyableValue.class.getName());
        }
    }

    @Override
    public final Method getUDFMethod() {
        try {
            return ((Object)((Object)this)).getClass().getMethod("reduce", Value.class, Iterator.class, Collector.class);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static final class OneHeadIterator
    implements Iterator<Value[]> {
        private Value[] head;
        private Iterator<Value[]> source;

        private OneHeadIterator() {
        }

        private void set(Value[] head, Iterator<Value[]> source) {
            this.head = head;
            this.source = source;
        }

        @Override
        public boolean hasNext() {
            return this.head != null || this.source.hasNext();
        }

        @Override
        public Value[] next() {
            if (this.head != null) {
                Value[] tmp = this.head;
                this.head = null;
                return tmp;
            }
            return this.source.next();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

