/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.sopremo.pact;

import com.google.common.reflect.TypeToken;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.common.functions.GenericReducer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.SopremoEnvironment;
import eu.stratosphere.sopremo.pact.JsonCollector;
import eu.stratosphere.sopremo.pact.RecordToJsonIterator;
import eu.stratosphere.sopremo.pact.SopremoFunction;
import eu.stratosphere.sopremo.pact.SopremoUtil;
import eu.stratosphere.sopremo.pact.TypedRecordToJsonIterator;
import eu.stratosphere.sopremo.pact.UntypedRecordToJsonIterator;
import eu.stratosphere.sopremo.serialization.SopremoRecord;
import eu.stratosphere.sopremo.type.ArrayNode;
import eu.stratosphere.sopremo.type.IJsonNode;
import eu.stratosphere.sopremo.type.IStreamNode;
import eu.stratosphere.sopremo.type.StreamNode;
import eu.stratosphere.sopremo.type.typed.TypedObjectNode;
import eu.stratosphere.util.Collector;
import java.util.Iterator;

public abstract class GenericSopremoReduce<Elem extends IJsonNode, Out extends IJsonNode>
extends AbstractFunction
implements GenericReducer<SopremoRecord, SopremoRecord>,
SopremoFunction {
    private EvaluationContext context;
    private JsonCollector<Out> collector;
    private RecordToJsonIterator<? extends Elem> iterator;
    private final StreamNode<Elem> array = new StreamNode();

    public void combine(Iterator<SopremoRecord> records, Collector<SopremoRecord> collector) throws Exception {
        this.collector.configure(collector);
        this.iterator.setIterator(records);
        try {
            if (SopremoUtil.LOG.isTraceEnabled()) {
                ArrayNode<Elem> array = new ArrayNode<Elem>(this.array);
                SopremoUtil.LOG.trace((Object)String.format("%s %s", this.getContext().getOperatorDescription(), array));
                this.combine(array, this.collector);
            } else {
                this.combine(this.array, this.collector);
            }
        }
        catch (RuntimeException e) {
            SopremoUtil.LOG.error((Object)String.format("Error occurred @ %s with %s: %s", this.getContext().getOperatorDescription(), this.array, e));
            throw e;
        }
    }

    @Override
    public final EvaluationContext getContext() {
        return this.context;
    }

    public void open(Configuration parameters) {
        SopremoEnvironment.getInstance().setConfiguration(parameters);
        this.context = SopremoEnvironment.getInstance().getEvaluationContext();
        TypedObjectNode typedInputNode = SopremoUtil.getTypedNodes(TypeToken.of(this.getClass()).getSupertype(GenericSopremoReduce.class))[0];
        this.iterator = typedInputNode == null ? new UntypedRecordToJsonIterator() : new TypedRecordToJsonIterator(typedInputNode);
        this.collector = new JsonCollector(this.context);
        SopremoUtil.configureWithTransferredState(this, GenericSopremoReduce.class, parameters);
        this.array.setNodeIterator(this.iterator);
    }

    public void reduce(Iterator<SopremoRecord> records, Collector<SopremoRecord> out) {
        this.collector.configure(out);
        this.iterator.setIterator(records);
        try {
            if (SopremoUtil.LOG.isTraceEnabled()) {
                ArrayNode<Elem> array = new ArrayNode<Elem>(this.array);
                SopremoUtil.LOG.trace((Object)String.format("%s %s", this.getContext().getOperatorDescription(), array));
                this.reduce(array, this.collector);
            } else {
                this.reduce(this.array, this.collector);
            }
        }
        catch (RuntimeException e) {
            SopremoUtil.LOG.error((Object)String.format("Error occurred @ %s with %s: %s", this.getContext().getOperatorDescription(), this.array, e));
            throw e;
        }
    }

    protected void combine(IStreamNode<Elem> values, JsonCollector<Out> out) {
        this.reduce(values, out);
    }

    protected abstract void reduce(IStreamNode<Elem> var1, JsonCollector<Out> var2);
}

