/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.stream.element;

import cascading.CascadingException;
import cascading.flow.FlowProcess;
import cascading.flow.SliceCounters;
import cascading.flow.StepCounters;
import cascading.flow.stream.StopDataNotificationException;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.duct.DuctException;
import cascading.flow.stream.element.ElementStage;
import cascading.flow.stream.element.InputSource;
import cascading.tap.Tap;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import java.io.Closeable;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceStage
extends ElementStage<Void, TupleEntry>
implements Callable<Throwable>,
InputSource {
    private static final Logger LOG = LoggerFactory.getLogger(SourceStage.class);
    private final Tap source;

    public SourceStage(FlowProcess flowProcess, Tap source) {
        super(flowProcess, source);
        this.source = source;
    }

    public Tap getSource() {
        return this.source;
    }

    @Override
    public Throwable call() throws Exception {
        return this.map(null);
    }

    @Override
    public void run(Object input) throws Throwable {
        Throwable throwable = this.map(input);
        if (throwable != null) {
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Throwable map(Object input) {
        Throwable localThrowable = null;
        Closeable iterator = null;
        try {
            this.next.start(this);
            iterator = this.source.openForRead(this.flowProcess, input);
            while (iterator.hasNext()) {
                TupleEntry tupleEntry;
                if (Thread.interrupted()) {
                    throw new InterruptedException("thread interrupted");
                }
                try {
                    tupleEntry = this.timedNext(StepCounters.Read_Duration, (TupleEntryIterator)iterator);
                    this.flowProcess.increment(StepCounters.Tuples_Read, 1L);
                    this.flowProcess.increment(SliceCounters.Tuples_Read, 1L);
                }
                catch (OutOfMemoryError error) {
                    this.handleReThrowableException("out of memory, try increasing task memory allocation", error);
                    continue;
                }
                catch (CascadingException exception) {
                    this.handleException(exception, null);
                    continue;
                }
                catch (Throwable throwable) {
                    this.handleException(new DuctException("internal error", throwable), null);
                    continue;
                }
                try {
                    this.next.receive(this, 0, tupleEntry);
                }
                catch (StopDataNotificationException exception) {
                    LOG.info("received stop data notification: {}", (Object)exception.getMessage());
                    break;
                }
            }
            this.next.complete(this);
        }
        catch (InterruptedException currentThrowable) {
        }
        catch (Throwable throwable) {
            if (!(throwable instanceof OutOfMemoryError)) {
                LOG.error("caught throwable", throwable);
            }
            Throwable throwable2 = throwable;
            return throwable2;
        }
        finally {
            try {
                if (iterator != null) {
                    iterator.close();
                }
            }
            catch (Throwable currentThrowable) {
                if (!(currentThrowable instanceof OutOfMemoryError)) {
                    LOG.warn("failed closing iterator", currentThrowable);
                }
                localThrowable = currentThrowable;
            }
        }
        return localThrowable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TupleEntry timedNext(StepCounters durationCounter, TupleEntryIterator iterator) {
        long start = System.currentTimeMillis();
        try {
            TupleEntry tupleEntry = (TupleEntry)iterator.next();
            return tupleEntry;
        }
        finally {
            this.flowProcess.increment(durationCounter, System.currentTimeMillis() - start);
        }
    }

    @Override
    public void initialize() {
    }

    @Override
    public void receive(Duct previous, int ordinal, Void nada) {
        throw new UnsupportedOperationException("use call() instead");
    }
}

