/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow;

import cascading.CascadingException;
import cascading.cascade.Cascade;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowException;
import cascading.flow.FlowListener;
import cascading.flow.FlowNode;
import cascading.flow.FlowSession;
import cascading.flow.FlowSkipIfSinkNotStale;
import cascading.flow.FlowSkipStrategy;
import cascading.flow.FlowStep;
import cascading.flow.FlowStepListener;
import cascading.flow.FlowStepStrategy;
import cascading.flow.FlowTapException;
import cascading.flow.planner.BaseFlowNode;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.planner.PlannerInfo;
import cascading.flow.planner.PlatformInfo;
import cascading.flow.planner.graph.ElementGraphs;
import cascading.flow.planner.graph.FlowElementGraph;
import cascading.flow.planner.process.FlowStepGraph;
import cascading.flow.planner.process.ProcessGraphs;
import cascading.flow.planner.process.ProcessModel;
import cascading.management.CascadingServices;
import cascading.management.UnitOfWorkExecutorStrategy;
import cascading.management.UnitOfWorkSpawnStrategy;
import cascading.management.state.ClientState;
import cascading.property.AppProps;
import cascading.property.PropertyUtil;
import cascading.stats.FlowStats;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.ProcessLogger;
import cascading.util.ShutdownUtil;
import cascading.util.Util;
import cascading.util.Version;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import riffle.process.DependencyIncoming;
import riffle.process.DependencyOutgoing;
import riffle.process.Process;
import riffle.process.ProcessCleanup;
import riffle.process.ProcessComplete;
import riffle.process.ProcessPrepare;
import riffle.process.ProcessStart;
import riffle.process.ProcessStop;

@Process
public abstract class BaseFlow<Config>
implements Flow<Config>,
ProcessLogger {
    private static final Logger LOG = LoggerFactory.getLogger(Flow.class);
    private static final int LOG_FLOW_NAME_MAX = 25;
    private PlannerInfo plannerInfo = PlannerInfo.NULL;
    protected PlatformInfo platformInfo = PlatformInfo.NULL;
    private String id;
    private String name;
    private String runID;
    private List<String> classPath;
    private String tags;
    private List<SafeFlowListener> listeners;
    private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale();
    protected FlowStats flowStats;
    protected Map<String, Tap> sources = Collections.emptyMap();
    protected Map<String, Tap> sinks = Collections.emptyMap();
    private Map<String, Tap> traps = Collections.emptyMap();
    private Map<String, Tap> checkpoints = Collections.emptyMap();
    protected boolean stopJobsOnExit = true;
    private int submitPriority = 5;
    protected FlowStepGraph flowStepGraph;
    protected transient Thread thread;
    protected Throwable throwable;
    protected volatile boolean stop;
    protected volatile boolean completed = false;
    protected String flowCanonicalHash;
    protected FlowElementGraph flowElementGraph;
    private transient CascadingServices cascadingServices;
    private FlowStepStrategy<Config> flowStepStrategy = null;
    protected transient List<FlowStep<Config>> steps;
    private transient Map<String, FlowStepJob<Config>> jobsMap;
    private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
    private transient ReentrantLock stopLock = new ReentrantLock(true);
    protected ShutdownUtil.Hook shutdownHook;
    protected HashMap<String, String> flowDescriptor;

    static boolean getStopJobsOnExit(Map<Object, Object> properties) {
        return Boolean.parseBoolean(PropertyUtil.getProperty(properties, "cascading.flow.stopjobsonexit", "true"));
    }

    protected BaseFlow() {
        this.name = "NA";
        this.flowStats = this.createPrepareFlowStats();
    }

    protected BaseFlow(PlatformInfo platformInfo, String name) {
        if (platformInfo != null) {
            this.platformInfo = platformInfo;
        }
        this.name = name;
    }

    protected BaseFlow(PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor) {
        if (platformInfo != null) {
            this.platformInfo = platformInfo;
        }
        this.name = name;
        if (flowDescriptor != null) {
            this.flowDescriptor = new LinkedHashMap<String, String>(flowDescriptor);
        }
        this.addSessionProperties(properties);
        this.initConfig(properties, defaultConfig);
    }

    protected BaseFlow(PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef) {
        properties = PropertyUtil.asFlatMap(properties);
        if (platformInfo != null) {
            this.platformInfo = platformInfo;
        }
        this.name = flowDef.getName();
        this.tags = flowDef.getTags();
        this.runID = flowDef.getRunID();
        this.classPath = flowDef.getClassPath();
        if (!flowDef.getFlowDescriptor().isEmpty()) {
            this.flowDescriptor = new LinkedHashMap<String, String>(flowDef.getFlowDescriptor());
        }
        this.addSessionProperties(properties);
        this.initConfig(properties, defaultConfig);
        this.setSources(flowDef.getSourcesCopy());
        this.setSinks(flowDef.getSinksCopy());
        this.setTraps(flowDef.getTrapsCopy());
        this.setCheckpoints(flowDef.getCheckpointsCopy());
        this.initFromTaps();
        this.retrieveSourceFields();
        this.retrieveSinkFields();
    }

    public void setPlannerInfo(PlannerInfo plannerInfo) {
        this.plannerInfo = plannerInfo;
    }

    @Override
    public PlannerInfo getPlannerInfo() {
        return this.plannerInfo;
    }

    @Override
    public PlatformInfo getPlatformInfo() {
        return this.platformInfo;
    }

    public void initialize(FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph) {
        this.addPlannerProperties();
        this.flowElementGraph = flowElementGraph;
        this.flowStepGraph = flowStepGraph;
        this.initSteps();
        this.flowStats = this.createPrepareFlowStats();
        this.initializeNewJobsMap();
        this.initializeChildStats();
    }

    public FlowElementGraph updateSchemes(FlowElementGraph pipeGraph) {
        this.presentSourceFields(pipeGraph);
        this.presentSinkFields(pipeGraph);
        return new FlowElementGraph(pipeGraph);
    }

    protected void retrieveSourceFields() {
        for (Tap tap : this.sources.values()) {
            tap.retrieveSourceFields(this.getFlowProcess());
        }
    }

    protected void presentSourceFields(FlowElementGraph pipeGraph) {
        for (Tap tap : this.sources.values()) {
            if (!pipeGraph.containsVertex(tap)) continue;
            tap.presentSourceFields(this.getFlowProcess(), this.getFieldsFor(pipeGraph, tap));
        }
        for (Tap tap : this.checkpoints.values()) {
            if (!pipeGraph.containsVertex(tap)) continue;
            tap.presentSourceFields(this.getFlowProcess(), this.getFieldsFor(pipeGraph, tap));
        }
    }

    protected void retrieveSinkFields() {
        for (Tap tap : this.sinks.values()) {
            tap.retrieveSinkFields(this.getFlowProcess());
        }
    }

    protected void presentSinkFields(FlowElementGraph pipeGraph) {
        for (Tap tap : this.sinks.values()) {
            if (!pipeGraph.containsVertex(tap)) continue;
            tap.presentSinkFields(this.getFlowProcess(), this.getFieldsFor(pipeGraph, tap));
        }
        for (Tap tap : this.checkpoints.values()) {
            if (!pipeGraph.containsVertex(tap)) continue;
            tap.presentSinkFields(this.getFlowProcess(), this.getFieldsFor(pipeGraph, tap));
        }
    }

    protected Fields getFieldsFor(FlowElementGraph pipeGraph, Tap tap) {
        return pipeGraph.outgoingEdgesOf(tap).iterator().next().getOutValuesFields();
    }

    protected void addSessionProperties(Map<Object, Object> properties) {
        if (properties == null) {
            return;
        }
        PropertyUtil.setProperty(properties, "cascading.flow.id", this.getID());
        PropertyUtil.setProperty(properties, "cascading.flow.tags", this.getTags());
        AppProps.setApplicationID(properties);
        PropertyUtil.setProperty(properties, "cascading.app.name", this.makeAppName(properties));
        PropertyUtil.setProperty(properties, "cascading.app.version", this.makeAppVersion(properties));
    }

    protected void addPlannerProperties() {
        this.setConfigProperty(this.getConfig(), "cascading.flow.planner", this.getPlannerInfo().name);
        this.setConfigProperty(this.getConfig(), "cascading.flow.platform", this.getPlannerInfo().platform);
        this.setConfigProperty(this.getConfig(), "cascading.flow.registry", this.getPlannerInfo().registry);
    }

    private String makeAppName(Map<Object, Object> properties) {
        if (properties == null) {
            return null;
        }
        String name = AppProps.getApplicationName(properties);
        if (name != null) {
            return name;
        }
        return Util.findName(AppProps.getApplicationJarPath(properties));
    }

    private String makeAppVersion(Map<Object, Object> properties) {
        if (properties == null) {
            return null;
        }
        String name = AppProps.getApplicationVersion(properties);
        if (name != null) {
            return name;
        }
        return Util.findVersion(AppProps.getApplicationJarPath(properties));
    }

    protected FlowStats createPrepareFlowStats() {
        FlowStats flowStats = this.createFlowStats();
        flowStats.prepare();
        flowStats.markPending();
        return flowStats;
    }

    protected FlowStats createFlowStats() {
        return new FlowStats(this, this.getClientState());
    }

    public CascadingServices getCascadingServices() {
        if (this.cascadingServices == null) {
            this.cascadingServices = new CascadingServices(this.getConfigAsProperties());
        }
        return this.cascadingServices;
    }

    protected ClientState getClientState() {
        return this.getFlowSession().getCascadingServices().createClientState(this.getID());
    }

    protected void initSteps() {
        if (this.flowStepGraph == null) {
            return;
        }
        Set flowSteps = this.flowStepGraph.vertexSet();
        for (FlowStep flowStep : flowSteps) {
            ((BaseFlowStep)flowStep).setFlow(this);
            Set flowNodes = flowStep.getFlowNodeGraph().vertexSet();
            for (FlowNode flowNode : flowNodes) {
                ((BaseFlowNode)flowNode).setFlowStep(flowStep);
            }
        }
    }

    private void initFromTaps() {
        this.initFromTaps(this.sources);
        this.initFromTaps(this.sinks);
        this.initFromTaps(this.traps);
    }

    private void initFromTaps(Map<String, Tap> taps) {
        for (Tap tap : taps.values()) {
            tap.flowConfInit(this);
        }
    }

    @Override
    public String getName() {
        return this.name;
    }

    protected void setName(String name) {
        this.name = name;
    }

    @Override
    public String getID() {
        if (this.id == null) {
            this.id = Util.createUniqueID();
        }
        return this.id;
    }

    @Override
    public String getTags() {
        return this.tags;
    }

    @Override
    public int getSubmitPriority() {
        return this.submitPriority;
    }

    @Override
    public void setSubmitPriority(int submitPriority) {
        if (submitPriority < 1 || submitPriority > 10) {
            throw new IllegalArgumentException("submitPriority must be between 1 and 10 inclusive, was: " + submitPriority);
        }
        this.submitPriority = submitPriority;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getFlowCanonicalHash() {
        if (this.flowCanonicalHash != null || this.flowElementGraph == null) {
            return this.flowCanonicalHash;
        }
        FlowElementGraph flowElementGraph = this.flowElementGraph;
        synchronized (flowElementGraph) {
            this.flowCanonicalHash = this.createFlowCanonicalHash(this.flowElementGraph);
        }
        return this.flowCanonicalHash;
    }

    protected String createFlowCanonicalHash(FlowElementGraph flowElementGraph) {
        if (flowElementGraph == null) {
            return null;
        }
        return ElementGraphs.canonicalHash(flowElementGraph);
    }

    protected FlowElementGraph getFlowElementGraph() {
        return this.flowElementGraph;
    }

    protected void setFlowElementGraph(FlowElementGraph flowElementGraph) {
        this.flowElementGraph = flowElementGraph;
    }

    protected FlowStepGraph getFlowStepGraph() {
        return this.flowStepGraph;
    }

    protected void setFlowStepGraph(FlowStepGraph flowStepGraph) {
        this.flowStepGraph = flowStepGraph;
    }

    protected void setSources(Map<String, Tap> sources) {
        if (sources == null) {
            return;
        }
        this.addListeners(sources.values());
        this.sources = sources;
    }

    protected void setSinks(Map<String, Tap> sinks) {
        if (sinks == null) {
            return;
        }
        this.addListeners(sinks.values());
        this.sinks = sinks;
    }

    protected void setTraps(Map<String, Tap> traps) {
        this.addListeners(traps.values());
        this.traps = traps;
    }

    protected void setCheckpoints(Map<String, Tap> checkpoints) {
        this.addListeners(checkpoints.values());
        this.checkpoints = checkpoints;
    }

    protected abstract void initConfig(Map<Object, Object> var1, Config var2);

    public Config createConfig(Map<Object, Object> properties, Config defaultConfig) {
        Config config = this.newConfig(defaultConfig);
        if (properties == null) {
            return config;
        }
        HashSet<Object> keys = new HashSet<Object>(properties.keySet());
        if (properties instanceof Properties) {
            keys.addAll(((Properties)properties).stringPropertyNames());
        }
        for (Object e : keys) {
            Object value = properties.get(e);
            if (value == null && properties instanceof Properties && e instanceof String) {
                value = ((Properties)properties).getProperty((String)e);
            }
            if (value == null) continue;
            this.setConfigProperty(config, e, value);
        }
        return config;
    }

    protected abstract void setConfigProperty(Config var1, Object var2, Object var3);

    protected abstract Config newConfig(Config var1);

    protected void initFromProperties(Map<Object, Object> properties) {
        this.stopJobsOnExit = BaseFlow.getStopJobsOnExit(properties);
    }

    public FlowSession getFlowSession() {
        return new FlowSession(this.getCascadingServices());
    }

    @Override
    public FlowStats getFlowStats() {
        return this.flowStats;
    }

    @Override
    public Map<String, String> getFlowDescriptor() {
        if (this.flowDescriptor == null) {
            return Collections.emptyMap();
        }
        return Collections.unmodifiableMap(this.flowDescriptor);
    }

    @Override
    public FlowStats getStats() {
        return this.getFlowStats();
    }

    void addListeners(Collection listeners) {
        for (Object listener : listeners) {
            if (!(listener instanceof FlowListener)) continue;
            this.addListener((FlowListener)listener);
        }
    }

    protected void removeListeners(Collection listeners) {
        if (listeners == null || listeners.isEmpty()) {
            return;
        }
        for (Object listener : listeners) {
            if (!(listener instanceof FlowListener)) continue;
            this.removeListener((FlowListener)listener);
        }
    }

    List<SafeFlowListener> getListeners() {
        if (this.listeners == null) {
            this.listeners = new LinkedList<SafeFlowListener>();
        }
        return this.listeners;
    }

    @Override
    public boolean hasListeners() {
        return this.listeners != null && !this.listeners.isEmpty();
    }

    @Override
    public void addListener(FlowListener flowListener) {
        this.getListeners().add(new SafeFlowListener(flowListener));
    }

    @Override
    public boolean removeListener(FlowListener flowListener) {
        return this.getListeners().remove(new SafeFlowListener(flowListener));
    }

    @Override
    public boolean hasStepListeners() {
        boolean hasStepListeners = false;
        for (FlowStep<Config> step : this.getFlowSteps()) {
            hasStepListeners |= step.hasListeners();
        }
        return hasStepListeners;
    }

    @Override
    public void addStepListener(FlowStepListener flowStepListener) {
        for (FlowStep<Config> step : this.getFlowSteps()) {
            step.addListener(flowStepListener);
        }
    }

    @Override
    public boolean removeStepListener(FlowStepListener flowStepListener) {
        boolean listenerRemoved = true;
        for (FlowStep<Config> step : this.getFlowSteps()) {
            listenerRemoved &= step.removeListener(flowStepListener);
        }
        return listenerRemoved;
    }

    @Override
    public Map<String, Tap> getSources() {
        return Collections.unmodifiableMap(this.sources);
    }

    @Override
    public List<String> getSourceNames() {
        return new ArrayList<String>(this.sources.keySet());
    }

    @Override
    public Tap getSource(String name) {
        return this.sources.get(name);
    }

    @Override
    @DependencyIncoming
    public Collection<Tap> getSourcesCollection() {
        return this.getSources().values();
    }

    @Override
    public Map<String, Tap> getSinks() {
        return Collections.unmodifiableMap(this.sinks);
    }

    @Override
    public List<String> getSinkNames() {
        return new ArrayList<String>(this.sinks.keySet());
    }

    @Override
    public Tap getSink(String name) {
        return this.sinks.get(name);
    }

    @Override
    @DependencyOutgoing
    public Collection<Tap> getSinksCollection() {
        return this.getSinks().values();
    }

    @Override
    public Tap getSink() {
        return this.sinks.values().iterator().next();
    }

    @Override
    public Map<String, Tap> getTraps() {
        return Collections.unmodifiableMap(this.traps);
    }

    @Override
    public List<String> getTrapNames() {
        return new ArrayList<String>(this.traps.keySet());
    }

    @Override
    public Collection<Tap> getTrapsCollection() {
        return this.getTraps().values();
    }

    @Override
    public Map<String, Tap> getCheckpoints() {
        return Collections.unmodifiableMap(this.checkpoints);
    }

    @Override
    public List<String> getCheckpointNames() {
        return new ArrayList<String>(this.checkpoints.keySet());
    }

    @Override
    public Collection<Tap> getCheckpointsCollection() {
        return this.getCheckpoints().values();
    }

    @Override
    public boolean isStopJobsOnExit() {
        return this.stopJobsOnExit;
    }

    @Override
    public FlowSkipStrategy getFlowSkipStrategy() {
        return this.flowSkipStrategy;
    }

    @Override
    public FlowSkipStrategy setFlowSkipStrategy(FlowSkipStrategy flowSkipStrategy) {
        if (flowSkipStrategy == null) {
            throw new IllegalArgumentException("flowSkipStrategy may not be null");
        }
        try {
            FlowSkipStrategy flowSkipStrategy2 = this.flowSkipStrategy;
            return flowSkipStrategy2;
        }
        finally {
            this.flowSkipStrategy = flowSkipStrategy;
        }
    }

    @Override
    public boolean isSkipFlow() throws IOException {
        return this.flowSkipStrategy.skipFlow(this);
    }

    @Override
    public boolean areSinksStale() throws IOException {
        return this.areSourcesNewer(this.getSinkModified());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean areSourcesNewer(long sinkModified) throws IOException {
        Object config = this.getConfig();
        Iterator<Tap> values = this.sources.values().iterator();
        long sourceModified = 0L;
        try {
            sourceModified = Util.getSourceModified(config, values, sinkModified);
            if (sinkModified < sourceModified) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            if (LOG.isInfoEnabled()) {
                this.logInfo("source modification date at: " + new Date(sourceModified), new Object[0]);
            }
        }
    }

    @Override
    public long getSinkModified() throws IOException {
        long sinkModified = Util.getSinkModified(this.getConfig(), this.sinks.values());
        if (LOG.isInfoEnabled()) {
            if (sinkModified == -1L) {
                this.logInfo("at least one sink is marked for delete", new Object[0]);
            }
            if (sinkModified == 0L) {
                this.logInfo("at least one sink does not exist", new Object[0]);
            } else {
                this.logInfo("sink oldest modified date: " + new Date(sinkModified), new Object[0]);
            }
        }
        return sinkModified;
    }

    @Override
    public FlowStepStrategy getFlowStepStrategy() {
        return this.flowStepStrategy;
    }

    @Override
    public void setFlowStepStrategy(FlowStepStrategy flowStepStrategy) {
        this.flowStepStrategy = flowStepStrategy;
    }

    @Override
    public List<FlowStep<Config>> getFlowSteps() {
        if (this.steps != null) {
            return this.steps;
        }
        if (this.flowStepGraph == null) {
            return Collections.emptyList();
        }
        Iterator topoIterator = this.flowStepGraph.getTopologicalIterator();
        this.steps = new ArrayList<FlowStep<Config>>();
        while (topoIterator.hasNext()) {
            this.steps.add((FlowStep<Config>)topoIterator.next());
        }
        return this.steps;
    }

    @Override
    @ProcessPrepare
    public void prepare() {
        try {
            this.deleteSinksIfNotUpdate();
            this.deleteTrapsIfNotUpdate();
            this.deleteCheckpointsIfNotUpdate();
        }
        catch (IOException exception) {
            throw new FlowTapException("unable to prepare flow", exception);
        }
    }

    @Override
    @ProcessStart
    public synchronized void start() {
        if (this.thread != null) {
            return;
        }
        if (this.stop) {
            return;
        }
        this.registerShutdownHook();
        this.internalStart();
        String threadName = ("flow " + Util.toNull(this.getName())).trim();
        this.thread = this.createFlowThread(threadName);
        this.thread.start();
    }

    protected Thread createFlowThread(String threadName) {
        return new Thread(new Runnable(){

            @Override
            public void run() {
                BaseFlow.this.run();
            }
        }, threadName);
    }

    protected abstract void internalStart();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    @ProcessStop
    public synchronized void stop() {
        this.stopLock.lock();
        try {
            if (this.stop) {
                return;
            }
            this.stop = true;
            this.fireOnStopping();
            if (!this.flowStats.isFinished()) {
                this.flowStats.markStopped();
            }
            this.internalStopAllJobs();
            this.handleExecutorShutdown();
            this.internalClean(true);
            return;
        }
        finally {
            block24: {
                try {
                    this.flowStats.cleanup();
                    this.stopLock.unlock();
                    if (this.thread == null) break block24;
                }
                catch (Throwable throwable) {
                    this.stopLock.unlock();
                    if (this.thread == null) throw throwable;
                    try {
                        this.thread.interrupt();
                        this.thread.join();
                        throw throwable;
                    }
                    catch (InterruptedException interruptedException) {}
                }
                try {
                    this.thread.interrupt();
                    this.thread.join();
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    protected abstract void internalClean(boolean var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @ProcessComplete
    public void complete() {
        if (!this.completed) {
            this.start();
        }
        try {
            try {
                BaseFlow baseFlow = this;
                synchronized (baseFlow) {
                    while (!this.completed && this.thread == null && !this.stop) {
                        Util.safeSleep(10L);
                    }
                }
                if (this.thread != null) {
                    this.thread.join();
                }
            }
            catch (InterruptedException exception) {
                throw new FlowException(this.getName(), "thread interrupted", exception);
            }
            this.ifStoppingBlockUntilComplete();
            if (this.throwable instanceof FlowException) {
                ((FlowException)this.throwable).setFlowName(this.getName());
            }
            if (this.throwable instanceof CascadingException) {
                throw (CascadingException)this.throwable;
            }
            if (this.throwable instanceof OutOfMemoryError) {
                throw (OutOfMemoryError)this.throwable;
            }
            if (this.throwable != null) {
                throw new FlowException(this.getName(), "unhandled exception", this.throwable);
            }
            if (this.hasListeners()) {
                for (SafeFlowListener safeFlowListener : this.getListeners()) {
                    if (safeFlowListener.throwable == null) continue;
                    throw new FlowException(this.getName(), "unhandled listener exception", this.throwable);
                }
            }
        }
        finally {
            this.completed = true;
            this.thread = null;
            this.throwable = null;
        }
    }

    protected void ifStoppingBlockUntilComplete() {
        if (!this.stopLock.isLocked()) {
            return;
        }
        try {
            this.stopLock.lock();
        }
        finally {
            this.stopLock.unlock();
        }
    }

    private void commitTraps() {
        for (Tap tap : this.traps.values()) {
            try {
                if (tap.commitResource(this.getConfig())) continue;
                this.logError("unable to commit trap: " + tap.getFullIdentifier(this.getConfig()), new Object[0]);
            }
            catch (IOException exception) {
                this.logError("unable to commit trap: " + tap.getFullIdentifier(this.getConfig()), exception);
            }
        }
    }

    @Override
    @ProcessCleanup
    public void cleanup() {
    }

    @Override
    public TupleEntryIterator openSource() throws IOException {
        return this.sources.values().iterator().next().openForRead(this.getFlowProcess());
    }

    @Override
    public TupleEntryIterator openSource(String name) throws IOException {
        if (!this.sources.containsKey(name)) {
            throw new IllegalArgumentException("source does not exist: " + name);
        }
        return this.sources.get(name).openForRead(this.getFlowProcess());
    }

    @Override
    public TupleEntryIterator openSink() throws IOException {
        return this.sinks.values().iterator().next().openForRead(this.getFlowProcess());
    }

    @Override
    public TupleEntryIterator openSink(String name) throws IOException {
        if (!this.sinks.containsKey(name)) {
            throw new IllegalArgumentException("sink does not exist: " + name);
        }
        return this.sinks.get(name).openForRead(this.getFlowProcess());
    }

    @Override
    public TupleEntryIterator openTrap() throws IOException {
        return this.traps.values().iterator().next().openForRead(this.getFlowProcess());
    }

    @Override
    public TupleEntryIterator openTrap(String name) throws IOException {
        if (!this.traps.containsKey(name)) {
            throw new IllegalArgumentException("trap does not exist: " + name);
        }
        return this.traps.get(name).openForRead(this.getFlowProcess());
    }

    public void deleteSinks() throws IOException {
        for (Tap tap : this.sinks.values()) {
            this.deleteOrFail(tap);
        }
    }

    private void deleteOrFail(Tap tap) throws IOException {
        if (!tap.resourceExists(this.getConfig())) {
            return;
        }
        if (!tap.deleteResource(this.getConfig())) {
            throw new FlowTapException("unable to delete resource: " + tap.getFullIdentifier(this.getFlowProcess()));
        }
    }

    public void deleteSinksIfNotUpdate() throws IOException {
        for (Tap tap : this.sinks.values()) {
            if (tap.isUpdate()) continue;
            this.deleteOrFail(tap);
        }
    }

    public void deleteSinksIfReplace() throws IOException {
        for (Tap tap : this.sinks.values()) {
            if (!tap.isKeep() || !tap.resourceExists(this.getConfig())) continue;
            throw new FlowTapException("resource exists and sink mode is KEEP, cannot overwrite: " + tap.getFullIdentifier(this.getFlowProcess()));
        }
        for (Tap tap : this.sinks.values()) {
            if (!tap.isReplace()) continue;
            this.deleteOrFail(tap);
        }
    }

    public void deleteTrapsIfNotUpdate() throws IOException {
        for (Tap tap : this.traps.values()) {
            if (tap.isUpdate()) continue;
            this.deleteOrFail(tap);
        }
    }

    public void deleteCheckpointsIfNotUpdate() throws IOException {
        for (Tap tap : this.checkpoints.values()) {
            if (tap.isUpdate()) continue;
            this.deleteOrFail(tap);
        }
    }

    public void deleteTrapsIfReplace() throws IOException {
        for (Tap tap : this.traps.values()) {
            if (!tap.isReplace()) continue;
            this.deleteOrFail(tap);
        }
    }

    public void deleteCheckpointsIfReplace() throws IOException {
        for (Tap tap : this.checkpoints.values()) {
            if (!tap.isReplace()) continue;
            this.deleteOrFail(tap);
        }
    }

    @Override
    public boolean resourceExists(Tap tap) throws IOException {
        return tap.resourceExists(this.getConfig());
    }

    @Override
    public TupleEntryIterator openTapForRead(Tap tap) throws IOException {
        return tap.openForRead(this.getFlowProcess());
    }

    @Override
    public TupleEntryCollector openTapForWrite(Tap tap) throws IOException {
        return tap.openForWrite(this.getFlowProcess());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void run() {
        if (this.thread == null) {
            throw new IllegalStateException("to start a Flow call start() or complete(), not Runnable#run()");
        }
        Version.printBanner();
        try {
            if (this.stop) {
                return;
            }
            this.flowStats.markStarted();
            this.fireOnStarting();
            if (this.isInfoEnabled()) {
                this.logInfo("starting", new Object[0]);
                for (Tap source : this.getSourcesCollection()) {
                    this.logInfo(" source: " + source, new Object[0]);
                }
                for (Tap sink : this.getSinksCollection()) {
                    this.logInfo(" sink: " + sink, new Object[0]);
                }
            }
            this.spawnSteps();
        }
        catch (Throwable throwable) {
            this.throwable = throwable;
        }
        finally {
            this.ifStoppingBlockUntilComplete();
            this.handleThrowableAndMarkFailed();
            if (!this.stop && !this.flowStats.isFinished()) {
                this.flowStats.markSuccessful();
            }
            this.internalClean(this.stop);
            this.commitTraps();
            try {
                this.fireOnCompleted();
            }
            finally {
                if (LOG.isInfoEnabled()) {
                    long totalSliceCPUSeconds = this.getTotalSliceCPUMilliSeconds();
                    if (totalSliceCPUSeconds == -1L) {
                        this.logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()), new Object[0]);
                    } else {
                        this.logInfo(" completed in: " + Util.formatDurationFromMillis(this.flowStats.getDuration()) + ", using cpu time: " + Util.formatDurationFromMillis(totalSliceCPUSeconds), new Object[0]);
                    }
                }
                this.flowStats.cleanup();
                this.internalShutdown();
                this.deregisterShutdownHook();
            }
        }
    }

    protected boolean spawnSteps() throws InterruptedException, ExecutionException {
        int numThreads = this.getMaxNumParallelSteps();
        int eligibleJobsSize = this.getEligibleJobsSize();
        if (numThreads == 0) {
            numThreads = eligibleJobsSize;
        }
        if (numThreads == 0) {
            throw new IllegalStateException("no jobs rendered for flow: " + this.getName());
        }
        if (LOG.isInfoEnabled()) {
            this.logInfo(" parallel execution of steps is enabled: " + (this.getMaxNumParallelSteps() != 1), new Object[0]);
            this.logInfo(" executing total steps: " + eligibleJobsSize, new Object[0]);
            this.logInfo(" allocating management threads: " + numThreads, new Object[0]);
        }
        List<Future<Throwable>> futures = this.spawnJobs(numThreads);
        for (Future<Throwable> future : futures) {
            try {
                this.throwable = future.get();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (this.throwable == null) continue;
            if (!this.stop) {
                this.internalStopAllJobs();
            }
            this.handleExecutorShutdown();
            return false;
        }
        return true;
    }

    protected long getTotalSliceCPUMilliSeconds() {
        return -1L;
    }

    protected abstract int getMaxNumParallelSteps();

    protected abstract void internalShutdown();

    private List<Future<Throwable>> spawnJobs(int numThreads) throws InterruptedException {
        if (this.spawnStrategy == null) {
            this.logError("no spawnStrategy set", new Object[0]);
            return new ArrayList<Future<Throwable>>();
        }
        if (this.stop) {
            return new ArrayList<Future<Throwable>>();
        }
        List<Callable<Throwable>> list = this.getJobMapCallables();
        return this.spawnStrategy.start(this, numThreads, list);
    }

    private void handleThrowableAndMarkFailed() {
        if (this.throwable != null && !this.stop) {
            this.flowStats.markFailed(this.throwable);
            this.fireOnThrowable();
        }
    }

    Map<String, FlowStepJob<Config>> getJobsMap() {
        return this.jobsMap;
    }

    protected boolean isJobsMapInitialized() {
        return this.jobsMap != null;
    }

    protected int getEligibleJobsSize() {
        return this.getJobMapCallables().size();
    }

    protected List<Callable<Throwable>> getJobMapCallables() {
        ArrayList<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>();
        for (FlowStepJob<Config> job : this.jobsMap.values()) {
            if (job.isCallableStarted()) continue;
            list.add(job);
        }
        return list;
    }

    protected void initializeNewJobsMap() {
        this.jobsMap = this.updateJobsMap(this.flowStepGraph, new LinkedHashMap<String, FlowStepJob<Config>>());
    }

    protected void updateJobsMap() {
        this.jobsMap = this.updateJobsMap(this.flowStepGraph, new LinkedHashMap<String, FlowStepJob<Config>>(this.jobsMap));
    }

    Map<String, FlowStepJob<Config>> updateJobsMap(FlowStepGraph flowStepGraph, Map<String, FlowStepJob<Config>> jobsMap) {
        Iterator iterator = flowStepGraph.getTopologicalIterator();
        while (iterator.hasNext()) {
            BaseFlowStep step = (BaseFlowStep)iterator.next();
            FlowStepJob flowStepJob = jobsMap.get(step.getID());
            if (flowStepJob == null) {
                flowStepJob = step.getCreateFlowStepJob(this.getFlowProcess(), this.getConfig());
                jobsMap.put(step.getID(), flowStepJob);
            }
            ArrayList predecessors = new ArrayList();
            for (ProcessModel flowStep : ProcessGraphs.predecessorListOf(flowStepGraph, step)) {
                predecessors.add(jobsMap.get(((FlowStep)flowStep).getID()));
            }
            flowStepJob.setPredecessors(predecessors);
        }
        return jobsMap;
    }

    protected void initializeChildStats() {
        for (FlowStepJob<Config> flowStepJob : this.jobsMap.values()) {
            this.flowStats.addStepStats(flowStepJob.getStepStats());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void internalStopAllJobs() {
        this.logInfo("stopping all jobs", new Object[0]);
        try {
            if (this.jobsMap == null) {
                return;
            }
            ArrayList<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>(this.jobsMap.values());
            Collections.reverse(jobs);
            for (FlowStepJob flowStepJob : jobs) {
                flowStepJob.stop();
            }
        }
        finally {
            this.logInfo("stopped all jobs", new Object[0]);
        }
    }

    protected void handleExecutorShutdown() {
        if (this.spawnStrategy == null) {
            return;
        }
        if (this.spawnStrategy.isCompleted(this)) {
            return;
        }
        this.logDebug("shutting down job executor", new Object[0]);
        try {
            this.spawnStrategy.complete(this, 300, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.logDebug("shutdown of job executor complete", new Object[0]);
    }

    protected void fireOnCompleted() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onCompleted event: " + this.getListeners().size(), new Object[0]);
            }
            for (FlowListener flowListener : this.getListeners()) {
                flowListener.onCompleted(this);
            }
        }
    }

    protected void fireOnThrowable(Throwable throwable) {
        this.throwable = throwable;
        this.fireOnThrowable();
    }

    protected void fireOnThrowable() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onThrowable event: " + this.getListeners().size(), new Object[0]);
            }
            boolean isHandled = false;
            for (FlowListener flowListener : this.getListeners()) {
                isHandled = flowListener.onThrowable(this, this.throwable) || isHandled;
            }
            if (isHandled) {
                this.throwable = null;
            }
        }
    }

    protected void fireOnStopping() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onStopping event: " + this.getListeners().size(), new Object[0]);
            }
            for (FlowListener flowListener : this.getListeners()) {
                flowListener.onStopping(this);
            }
        }
    }

    protected void fireOnStarting() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onStarting event: " + this.getListeners().size(), new Object[0]);
            }
            for (FlowListener flowListener : this.getListeners()) {
                flowListener.onStarting(this);
            }
        }
    }

    public String toString() {
        StringBuffer buffer = new StringBuffer();
        if (this.getName() != null) {
            buffer.append(this.getName()).append(": ");
        }
        for (FlowStep<Config> step : this.getFlowSteps()) {
            buffer.append(step);
        }
        return buffer.toString();
    }

    @Override
    public final boolean isInfoEnabled() {
        return LOG.isInfoEnabled();
    }

    @Override
    public final boolean isDebugEnabled() {
        return LOG.isDebugEnabled();
    }

    @Override
    public void logInfo(String message, Object ... arguments) {
        LOG.info("[" + Util.truncate(this.getName(), 25) + "] " + message, arguments);
    }

    @Override
    public void logDebug(String message, Object ... arguments) {
        LOG.debug("[" + Util.truncate(this.getName(), 25) + "] " + message, arguments);
    }

    @Override
    public void logWarn(String message) {
        LOG.warn("[" + Util.truncate(this.getName(), 25) + "] " + message);
    }

    @Override
    public void logWarn(String message, Throwable throwable) {
        LOG.warn("[" + Util.truncate(this.getName(), 25) + "] " + message, throwable);
    }

    @Override
    public void logWarn(String message, Object ... arguments) {
        LOG.warn("[" + Util.truncate(this.getName(), 25) + "] " + message, arguments);
    }

    @Override
    public void logError(String message, Object ... arguments) {
        LOG.error("[" + Util.truncate(this.getName(), 25) + "] " + message, arguments);
    }

    @Override
    public void logError(String message, Throwable throwable) {
        LOG.error("[" + Util.truncate(this.getName(), 25) + "] " + message, throwable);
    }

    @Override
    public void writeDOT(String filename) {
        if (this.flowElementGraph == null) {
            throw new UnsupportedOperationException("this flow instance cannot write a DOT file");
        }
        this.flowElementGraph.writeDOT(filename);
    }

    @Override
    public void writeStepsDOT(String filename) {
        if (this.flowStepGraph == null) {
            throw new UnsupportedOperationException("this flow instance cannot write a DOT file");
        }
        this.flowStepGraph.writeDOT(filename);
    }

    public FlowHolder getHolder() {
        return new FlowHolder(this);
    }

    public void setCascade(Cascade cascade) {
        this.setConfigProperty(this.getConfig(), "cascading.cascade.id", cascade.getID());
        this.flowStats.recordInfo();
    }

    @Override
    public String getCascadeID() {
        return this.getProperty("cascading.cascade.id");
    }

    @Override
    public String getRunID() {
        return this.runID;
    }

    public List<String> getClassPath() {
        return this.classPath;
    }

    @Override
    public void setSpawnStrategy(UnitOfWorkSpawnStrategy spawnStrategy) {
        this.spawnStrategy = spawnStrategy;
    }

    @Override
    public UnitOfWorkSpawnStrategy getSpawnStrategy() {
        return this.spawnStrategy;
    }

    protected void registerShutdownHook() {
        if (!this.isStopJobsOnExit()) {
            return;
        }
        this.shutdownHook = new ShutdownUtil.Hook(){

            @Override
            public ShutdownUtil.Hook.Priority priority() {
                return ShutdownUtil.Hook.Priority.WORK_CHILD;
            }

            @Override
            public void execute() {
                BaseFlow.this.logInfo("shutdown hook calling stop on flow", new Object[0]);
                BaseFlow.this.stop();
            }
        };
        ShutdownUtil.addHook(this.shutdownHook);
    }

    private void deregisterShutdownHook() {
        if (!this.isStopJobsOnExit() || this.stop) {
            return;
        }
        ShutdownUtil.removeHook(this.shutdownHook);
    }

    private class SafeFlowListener
    implements FlowListener {
        final FlowListener flowListener;
        Throwable throwable;

        private SafeFlowListener(FlowListener flowListener) {
            this.flowListener = flowListener;
        }

        @Override
        public void onStarting(Flow flow) {
            try {
                this.flowListener.onStarting(flow);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
            }
        }

        @Override
        public void onStopping(Flow flow) {
            try {
                this.flowListener.onStopping(flow);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
            }
        }

        @Override
        public void onCompleted(Flow flow) {
            try {
                this.flowListener.onCompleted(flow);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
            }
        }

        @Override
        public boolean onThrowable(Flow flow, Throwable flowThrowable) {
            try {
                return this.flowListener.onThrowable(flow, flowThrowable);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
                return false;
            }
        }

        private void handleThrowable(Throwable throwable) {
            this.throwable = throwable;
            BaseFlow.this.logWarn(String.format("flow listener %s threw throwable", this.flowListener), throwable);
            BaseFlow.this.stop();
        }

        public boolean equals(Object object) {
            if (object instanceof SafeFlowListener) {
                return this.flowListener.equals(((SafeFlowListener)object).flowListener);
            }
            return this.flowListener.equals(object);
        }

        public int hashCode() {
            return this.flowListener.hashCode();
        }
    }

    public static class FlowHolder {
        public Flow flow;

        public FlowHolder() {
        }

        public FlowHolder(Flow flow) {
            this.flow = flow;
        }
    }
}

