/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.stream.element;

import cascading.CascadingException;
import cascading.flow.FlowProcess;
import cascading.flow.SliceCounters;
import cascading.flow.StepCounters;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.duct.DuctException;
import cascading.flow.stream.element.ElementStage;
import cascading.flow.stream.graph.StreamGraph;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import java.io.IOException;

public class SinkStage
extends ElementStage<TupleEntry, Void> {
    private final Tap sink;
    private TupleEntryCollector collector;

    public SinkStage(FlowProcess flowProcess, Tap sink) {
        super(flowProcess, sink);
        this.sink = sink;
    }

    public Tap getSink() {
        return this.sink;
    }

    @Override
    public void bind(StreamGraph streamGraph) {
    }

    @Override
    public void prepare() {
        try {
            this.collector = this.sink.openForWrite(this.flowProcess, this.getOutput());
            if (this.sink.getSinkFields().isAll()) {
                Fields fields = this.getIncomingScopes().get(0).getIncomingTapFields();
                this.collector.setFields(fields);
            }
        }
        catch (IOException exception) {
            throw new DuctException("failed opening sink", exception);
        }
    }

    protected Object getOutput() {
        return null;
    }

    @Override
    public void start(Duct previous) {
    }

    @Override
    public void receive(Duct previous, int ordinal, TupleEntry tupleEntry) {
        try {
            this.timedAdd(StepCounters.Write_Duration, tupleEntry);
            this.flowProcess.increment(StepCounters.Tuples_Written, 1L);
            this.flowProcess.increment(SliceCounters.Tuples_Written, 1L);
        }
        catch (OutOfMemoryError error) {
            this.handleReThrowableException("out of memory, try increasing task memory allocation", error);
        }
        catch (CascadingException exception) {
            this.handleException(exception, tupleEntry);
        }
        catch (Throwable throwable) {
            this.handleException(new DuctException("internal error: " + tupleEntry.getTuple().print(), throwable), tupleEntry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void timedAdd(StepCounters durationCounter, TupleEntry tupleEntry) {
        long start = System.currentTimeMillis();
        try {
            this.collector.add(tupleEntry);
        }
        finally {
            this.flowProcess.increment(durationCounter, System.currentTimeMillis() - start);
        }
    }

    @Override
    public void complete(Duct previous) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cleanup() {
        try {
            if (this.collector != null) {
                long start = System.currentTimeMillis();
                try {
                    this.collector.close();
                }
                finally {
                    this.flowProcess.increment(StepCounters.Write_Duration, System.currentTimeMillis() - start);
                }
            }
            this.collector = null;
        }
        finally {
            super.cleanup();
        }
    }
}

