/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.planner;

import cascading.flow.Flow;
import cascading.flow.FlowException;
import cascading.flow.FlowStepStrategy;
import cascading.flow.planner.BaseFlowStep;
import cascading.management.state.ClientState;
import cascading.stats.FlowNodeStats;
import cascading.stats.FlowStats;
import cascading.stats.FlowStepStats;
import cascading.util.Util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlowStepJob<Config>
implements Callable<Throwable> {
    private static final Logger LOG = LoggerFactory.getLogger(FlowStepJob.class);
    protected final String stepName;
    protected final Config jobConfiguration;
    protected long pollingInterval = 1000L;
    protected long statsStoreInterval = 60000L;
    protected long blockForCompletedChildDetailDuration = 60000L;
    protected List<FlowStepJob<Config>> predecessors;
    private final CountDownLatch latch = new CountDownLatch(1);
    private AtomicBoolean callableStarted = new AtomicBoolean(false);
    private volatile boolean stop = false;
    protected final BaseFlowStep<Config> flowStep;
    protected FlowStepStats flowStepStats;
    protected Throwable throwable;

    public FlowStepJob(ClientState clientState, Config jobConfiguration, BaseFlowStep<Config> flowStep, long pollingInterval, long statsStoreInterval, long blockForCompletedChildDetailDuration) {
        this.jobConfiguration = jobConfiguration;
        this.stepName = flowStep.getName();
        this.pollingInterval = pollingInterval;
        this.statsStoreInterval = statsStoreInterval;
        this.blockForCompletedChildDetailDuration = blockForCompletedChildDetailDuration;
        this.flowStep = flowStep;
        this.flowStepStats = this.createStepStats(clientState);
        this.flowStepStats.prepare();
        this.flowStepStats.markPending();
        for (FlowNodeStats flowNodeStats : this.flowStepStats.getFlowNodeStats()) {
            flowNodeStats.prepare();
            flowNodeStats.markPending();
        }
    }

    public Config getConfig() {
        return this.jobConfiguration;
    }

    protected abstract FlowStepStats createStepStats(ClientState var1);

    public synchronized void stop() {
        if (this.flowStep.isInfoEnabled()) {
            this.flowStep.logInfo("stopping: " + this.stepName, new Object[0]);
        }
        this.stop = true;
        if (!this.flowStepStats.isFinished()) {
            this.flowStepStats.markStopped();
        }
        try {
            this.internalBlockOnStop();
        }
        catch (IOException exception) {
            this.flowStep.logWarn("unable to kill job: " + this.stepName, exception);
        }
        finally {
            if (this.flowStepStats.isStopped()) {
                this.flowStep.rollbackSinks();
                this.flowStep.fireOnStopping();
            }
            this.flowStepStats.cleanup();
        }
    }

    protected abstract void internalBlockOnStop() throws IOException;

    public void setPredecessors(List<FlowStepJob<Config>> predecessors) {
        this.predecessors = predecessors;
    }

    @Override
    public Throwable call() {
        this.start();
        return this.throwable;
    }

    public boolean isCallableStarted() {
        return this.callableStarted.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void start() {
        if (this.callableStarted.getAndSet(true)) {
            return;
        }
        try {
            if (this.isSkipFlowStep()) {
                this.markSkipped();
                if (this.flowStep.isInfoEnabled() && this.flowStepStats.isSkipped()) {
                    this.flowStep.logInfo("skipping step: " + this.stepName, new Object[0]);
                }
                return;
            }
            FlowStepJob flowStepJob = this;
            synchronized (flowStepJob) {
                block17: {
                    if (!this.stop) break block17;
                    if (this.flowStep.isInfoEnabled()) {
                        this.flowStep.logInfo("stop called before start: " + this.stepName, new Object[0]);
                    }
                    return;
                }
                this.markStarted();
            }
            this.blockOnPredecessors();
            this.prepareResources();
            this.applyFlowStepConfStrategy();
            this.blockOnJob();
        }
        catch (Throwable throwable) {
            this.throwable = throwable;
            this.dumpDebugInfo();
            if (this.flowStepStats.isPending()) {
                this.markStarted();
            }
            if (!this.flowStepStats.isFinished()) {
                this.flowStepStats.markFailed(this.throwable);
                this.flowStep.fireOnThrowable(this.throwable);
            }
        }
        finally {
            this.latch.countDown();
            this.finalizeNodeSliceCapture();
            this.flowStepStats.cleanup();
        }
        this.internalCleanup();
    }

    private void prepareResources() throws Throwable {
        if (this.stop) {
            return;
        }
        Throwable throwable = this.flowStep.prepareResources();
        if (throwable != null) {
            throw throwable;
        }
    }

    private synchronized boolean markStarted() {
        if (this.flowStepStats.isFinished()) {
            return false;
        }
        this.flowStepStats.markStarted();
        return true;
    }

    private void applyFlowStepConfStrategy() {
        FlowStepStrategy flowStepStrategy = this.flowStep.getFlow().getFlowStepStrategy();
        if (flowStepStrategy == null) {
            return;
        }
        ArrayList predecessorSteps = new ArrayList();
        for (FlowStepJob<Config> predecessor : this.predecessors) {
            predecessorSteps.add(predecessor.flowStep);
        }
        flowStepStrategy.apply(this.flowStep.getFlow(), predecessorSteps, this.flowStep);
    }

    protected boolean isSkipFlowStep() throws IOException {
        if (this.flowStep.getFlow().getRunID() == null) {
            return false;
        }
        return this.flowStep.allSourcesExist() && !this.flowStep.areSourcesNewer(this.flowStep.getSinkModified());
    }

    protected void blockOnJob() throws IOException {
        if (this.stop) {
            return;
        }
        if (this.flowStep.isInfoEnabled()) {
            this.flowStep.logInfo("starting step: " + this.stepName, new Object[0]);
        }
        this.internalNonBlockingStart();
        this.markSubmitted();
        this.flowStep.fireOnStarting();
        this.blockTillCompleteOrStopped();
        if (!this.stop && !this.internalNonBlockingIsSuccessful()) {
            if (!this.flowStepStats.isFinished()) {
                this.flowStep.rollbackSinks();
                this.flowStepStats.markFailed(this.getThrowable());
                this.updateNodesStatus();
                this.flowStep.fireOnThrowable(this.getThrowable());
            }
            if (this.getThrowable() instanceof OutOfMemoryError) {
                throw (OutOfMemoryError)this.getThrowable();
            }
            this.dumpDebugInfo();
            this.throwable = !this.isRemoteExecution() ? new FlowException("local step failed: " + this.stepName, this.getThrowable()) : new FlowException("step failed: " + this.stepName + ", step id: " + this.getStepStats().getID() + ", job id: " + this.internalJobId() + ", please see cluster logs for failure messages");
        } else if (this.internalNonBlockingIsSuccessful() && !this.flowStepStats.isFinished()) {
            this.throwable = this.flowStep.commitSinks();
            if (this.throwable != null) {
                this.flowStepStats.markFailed(this.throwable);
                this.updateNodesStatus();
                this.flowStep.fireOnThrowable(this.throwable);
            } else {
                this.flowStepStats.markSuccessful();
                this.updateNodesStatus();
                this.flowStep.fireOnCompleted();
            }
        }
    }

    protected void finalizeNodeSliceCapture() {
        boolean allNodesFinished;
        long startOfFinalPolling = System.currentTimeMillis();
        long lastLog = 0L;
        long retries = 0L;
        while (true) {
            allNodesFinished = this.updateNodesStatus();
            this.flowStepStats.recordChildStats();
            if (allNodesFinished && this.flowStepStats.hasCapturedFinalDetail() || System.currentTimeMillis() - startOfFinalPolling >= this.blockForCompletedChildDetailDuration) break;
            if (System.currentTimeMillis() - lastLog > 1000L) {
                if (!allNodesFinished) {
                    this.flowStep.logInfo("did not capture all completed node details, will retry in {}, prior retries: {}", Util.formatDurationFromMillis(this.pollingInterval), retries);
                } else {
                    this.flowStep.logInfo("did not capture all completed slice details, will retry in {}, prior retries: {}", Util.formatDurationFromMillis(this.pollingInterval), retries);
                }
                lastLog = System.currentTimeMillis();
            }
            ++retries;
            this.sleepForPollingInterval();
        }
        if (!allNodesFinished) {
            this.flowStep.logWarn("unable to capture all completed node details or determine final state within configured duration: {}, configure property to increase wait duration: '{}'", Util.formatDurationFromMillis(this.blockForCompletedChildDetailDuration), "cascading.stats.complete_child_details.block.duration");
        }
        if (!this.flowStepStats.hasCapturedFinalDetail()) {
            this.flowStep.logWarn("unable to capture all completed slice details within configured duration: {}, configure property to increase wait duration: '{}'", Util.formatDurationFromMillis(this.blockForCompletedChildDetailDuration), "cascading.stats.complete_child_details.block.duration");
        }
    }

    protected abstract boolean isRemoteExecution();

    protected abstract String internalJobId();

    protected abstract boolean internalNonBlockingIsSuccessful() throws IOException;

    protected abstract Throwable getThrowable();

    protected abstract void internalNonBlockingStart() throws IOException;

    protected void blockTillCompleteOrStopped() throws IOException {
        int iterations = (int)Math.floor(this.statsStoreInterval / this.pollingInterval);
        int count = 0;
        while (true) {
            if (this.flowStepStats.isSubmitted() && this.internalIsStartedRunning() && !this.stop) {
                this.markRunning();
                this.flowStep.fireOnRunning();
            }
            if (this.flowStepStats.isRunning()) {
                this.updateNodesStatus();
            }
            if (this.stop || this.internalNonBlockingIsComplete()) break;
            if (iterations == count++) {
                count = 0;
                this.flowStepStats.recordStats();
                this.flowStepStats.recordChildStats();
            }
            this.sleepForPollingInterval();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void markSubmitted() {
        FlowStats flowStats;
        Flow<Config> flow;
        if (this.flowStepStats.isStarted()) {
            this.flowStepStats.markSubmitted();
            Collection<FlowNodeStats> children = this.flowStepStats.getChildren();
            for (FlowNodeStats flowNodeStats : children) {
                flowNodeStats.markStarted();
            }
        }
        if ((flow = this.flowStep.getFlow()) == null) {
            LOG.warn("no parent flow set");
            return;
        }
        FlowStats flowStats2 = flowStats = flow.getFlowStats();
        synchronized (flowStats2) {
            if (flowStats.isStarted()) {
                flowStats.markSubmitted();
            }
        }
    }

    private synchronized void markSkipped() {
        if (this.flowStepStats.isFinished()) {
            return;
        }
        try {
            this.flowStepStats.markSkipped();
            this.flowStep.fireOnCompleted();
        }
        finally {
            this.markFlowRunning();
        }
    }

    private synchronized void markRunning() {
        this.flowStepStats.markRunning();
        this.markFlowRunning();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void markFlowRunning() {
        FlowStats flowStats;
        Flow<Config> flow = this.flowStep.getFlow();
        if (flow == null) {
            LOG.warn("no parent flow set");
            return;
        }
        FlowStats flowStats2 = flowStats = flow.getFlowStats();
        synchronized (flowStats2) {
            if (flowStats.isStarted() || flowStats.isSubmitted()) {
                flowStats.markRunning();
            }
        }
    }

    private boolean updateNodesStatus() {
        boolean allFinished = true;
        List<FlowNodeStats> children = this.flowStepStats.getFlowNodeStats();
        for (FlowNodeStats child : children) {
            if (child.isFinished() || child.isPending()) continue;
            this.updateNodeStatus(child);
            allFinished &= child.isFinished();
        }
        return allFinished;
    }

    protected abstract void updateNodeStatus(FlowNodeStats var1);

    protected abstract boolean internalNonBlockingIsComplete() throws IOException;

    protected void sleepForPollingInterval() {
        Util.safeSleep(this.pollingInterval);
    }

    protected void blockOnPredecessors() {
        for (FlowStepJob<Config> predecessor : this.predecessors) {
            if (predecessor.isSuccessful()) continue;
            this.flowStep.logWarn("abandoning step: " + this.stepName + ", predecessor failed: " + predecessor.stepName);
            this.stop();
        }
    }

    protected abstract void dumpDebugInfo();

    public boolean isSuccessful() {
        try {
            this.latch.await();
            return this.flowStepStats.isSuccessful() || this.flowStepStats.isSkipped();
        }
        catch (InterruptedException exception) {
            this.flowStep.logWarn("latch interrupted", exception);
            return false;
        }
    }

    public boolean isStarted() {
        return this.internalIsStartedRunning();
    }

    protected abstract boolean internalIsStartedRunning();

    protected void internalCleanup() {
    }

    public FlowStepStats getStepStats() {
        return this.flowStepStats;
    }
}

