/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.tez.stream.element;

import cascading.CascadingException;
import cascading.flow.FlowProcess;
import cascading.flow.SliceCounters;
import cascading.flow.planner.Scope;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.duct.DuctException;
import cascading.flow.stream.element.BoundaryStage;
import cascading.flow.stream.element.InputSource;
import cascading.flow.stream.graph.IORole;
import cascading.flow.stream.graph.StreamGraph;
import cascading.flow.tez.stream.element.OldOutputCollector;
import cascading.pipe.Boundary;
import cascading.pipe.Pipe;
import cascading.tap.hadoop.util.MeasuredOutputCollector;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.io.KeyTuple;
import cascading.tuple.io.ValueTuple;
import cascading.tuple.util.Resettable1;
import cascading.util.Util;
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TezBoundaryStage
extends BoundaryStage<TupleEntry, TupleEntry>
implements InputSource {
    private static final Logger LOG = LoggerFactory.getLogger(TezBoundaryStage.class);
    protected Collection<LogicalOutput> logicalOutputs;
    protected LogicalInput logicalInput;
    private MeasuredOutputCollector collector;
    private TupleEntry valueEntry;
    private final Resettable1<Tuple> keyTuple = new KeyTuple();

    public TezBoundaryStage(FlowProcess flowProcess, Boundary boundary, IORole role, Collection<LogicalOutput> logicalOutputs) {
        super(flowProcess, boundary, role);
        if (logicalOutputs == null || logicalOutputs.isEmpty()) {
            throw new IllegalArgumentException("output must not be null or empty");
        }
        this.logicalOutputs = logicalOutputs;
    }

    public TezBoundaryStage(FlowProcess flowProcess, Boundary boundary, IORole role, LogicalInput logicalInput) {
        super(flowProcess, boundary, role);
        if (logicalInput == null) {
            throw new IllegalArgumentException("inputs must not be null or empty");
        }
        this.logicalInput = logicalInput;
    }

    public void initialize() {
        super.initialize();
        Scope outgoingScope = (Scope)Util.getFirst((Collection)this.outgoingScopes);
        this.valueEntry = new TupleEntry(outgoingScope.getIncomingFunctionPassThroughFields(), true);
    }

    public void bind(StreamGraph streamGraph) {
        if (this.role != IORole.sink) {
            this.next = this.getNextFor(streamGraph);
        }
    }

    public void prepare() {
        try {
            if (this.logicalInput != null) {
                LOG.info("calling {}#start() on: {} {}", new Object[]{this.logicalInput.getClass().getSimpleName(), this.getBoundary(), Pipe.id((Pipe)this.getBoundary())});
                this.logicalInput.start();
            }
            if (this.logicalOutputs != null) {
                for (LogicalOutput logicalOutput : this.logicalOutputs) {
                    LOG.info("calling {}#start() on: {} {}", new Object[]{logicalOutput.getClass().getSimpleName(), this.getBoundary(), Pipe.id((Pipe)this.getBoundary())});
                    logicalOutput.start();
                }
            }
        }
        catch (Exception exception) {
            throw new CascadingException("unable to start input/output", (Throwable)exception);
        }
        if (this.role != IORole.source) {
            this.collector = new MeasuredOutputCollector(this.flowProcess, (Enum)SliceCounters.Write_Duration, this.createOutputCollector());
        }
        super.prepare();
    }

    public void start(Duct previous) {
        if (this.next != null) {
            super.start(previous);
        }
    }

    public void receive(Duct previous, int ordinal, TupleEntry incomingEntry) {
        try {
            Tuple tuple = incomingEntry.getTuple();
            this.keyTuple.reset((Object)tuple);
            this.collector.collect(this.keyTuple, (Object)ValueTuple.NULL);
            this.flowProcess.increment((Enum)SliceCounters.Tuples_Written, 1L);
        }
        catch (OutOfMemoryError error) {
            this.handleReThrowableException("out of memory, try increasing task memory allocation", error);
        }
        catch (CascadingException exception) {
            this.handleException(exception, incomingEntry);
        }
        catch (Throwable throwable) {
            this.handleException((Throwable)new DuctException("internal error: " + incomingEntry.getTuple().print(), throwable), incomingEntry);
        }
    }

    public void complete(Duct previous) {
        if (this.next != null) {
            super.complete(previous);
        }
    }

    public void run(Object input) throws Throwable {
        Throwable throwable = this.map();
        if (throwable != null) {
            throw throwable;
        }
    }

    protected Throwable map() throws Exception {
        Throwable localThrowable = null;
        try {
            this.start((Duct)this);
            KeyValueReader reader = (KeyValueReader)this.logicalInput.getReader();
            while (reader.next()) {
                Tuple currentKey = (Tuple)reader.getCurrentKey();
                this.valueEntry.setTuple(currentKey);
                this.next.receive((Duct)this, 0, (Object)this.valueEntry);
            }
            this.complete((Duct)this);
        }
        catch (Throwable throwable) {
            if (!(throwable instanceof OutOfMemoryError)) {
                LOG.error("caught throwable", throwable);
            }
            return throwable;
        }
        return localThrowable;
    }

    protected OutputCollector createOutputCollector() {
        if (this.logicalOutputs.size() == 1) {
            return new OldOutputCollector((LogicalOutput)Util.getFirst(this.logicalOutputs));
        }
        final OutputCollector[] collectors = new OutputCollector[this.logicalOutputs.size()];
        int count = 0;
        for (LogicalOutput logicalOutput : this.logicalOutputs) {
            collectors[count++] = new OldOutputCollector(logicalOutput);
        }
        return new OutputCollector(){

            public void collect(Object key, Object value) throws IOException {
                for (OutputCollector outputCollector : collectors) {
                    outputCollector.collect(key, value);
                }
            }
        };
    }
}

