/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.planner;

import cascading.flow.Flow;
import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.FlowStep;
import cascading.flow.FlowStepListener;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.planner.Scope;
import cascading.flow.planner.ScopedElement;
import cascading.flow.planner.graph.AnnotatedGraph;
import cascading.flow.planner.graph.ElementGraph;
import cascading.flow.planner.graph.ElementGraphs;
import cascading.flow.planner.process.FlowNodeGraph;
import cascading.flow.stream.annotations.StreamMode;
import cascading.management.CascadingServices;
import cascading.management.state.ClientState;
import cascading.operation.Operation;
import cascading.pipe.Group;
import cascading.pipe.Operator;
import cascading.pipe.Pipe;
import cascading.pipe.Splice;
import cascading.pipe.SubAssembly;
import cascading.property.ConfigDef;
import cascading.stats.FlowStepStats;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.type.SerializableType;
import cascading.util.EnumMultiMap;
import cascading.util.ProcessLogger;
import cascading.util.Util;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public abstract class BaseFlowStep<Config>
implements FlowStep<Config>,
ProcessLogger,
Serializable {
    private transient Flow<Config> flow;
    private String flowName;
    private String flowID;
    private transient Config flowStepConf;
    private int submitPriority = 5;
    String name;
    private String id;
    private int ordinal;
    private Map<String, String> processAnnotations;
    private List<SafeFlowStepListener> listeners;
    protected ElementGraph elementGraph;
    protected FlowNodeGraph flowNodeGraph;
    protected final Map<Tap, Set<String>> sources = new HashMap<Tap, Set<String>>();
    protected final Map<Tap, Set<String>> sinks = new HashMap<Tap, Set<String>>();
    private final Map<String, Tap> traps = new HashMap<String, Tap>();
    protected Tap tempSink;
    private final List<Group> groups = new ArrayList<Group>();
    protected transient FlowStepStats flowStepStats;
    private transient FlowStepJob<Config> flowStepJob;
    private Map<String, String> flowStepDescriptor = Collections.emptyMap();

    protected BaseFlowStep(String name, int ordinal) {
        this(name, ordinal, null);
    }

    protected BaseFlowStep(String name, int ordinal, Map<String, String> flowStepDescriptor) {
        this(name, ordinal, null, flowStepDescriptor);
    }

    protected BaseFlowStep(String name, int ordinal, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor) {
        this();
        this.setName(name);
        this.ordinal = ordinal;
        this.elementGraph = null;
        this.flowNodeGraph = flowNodeGraph;
        this.setFlowStepDescriptor(flowStepDescriptor);
    }

    protected BaseFlowStep(ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph) {
        this(elementStepGraph, flowNodeGraph, null);
    }

    protected BaseFlowStep(ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor) {
        this();
        this.elementGraph = elementStepGraph;
        this.flowNodeGraph = flowNodeGraph;
        this.setFlowStepDescriptor(flowStepDescriptor);
        this.configure();
    }

    protected BaseFlowStep() {
        this.id = Util.createUniqueIDWhichStartsWithAChar();
    }

    protected void configure() {
        BaseFlowStep.addSources(this, this.getElementGraph(), this.getFlowNodeGraph().getSourceTaps());
        BaseFlowStep.addSinks(this, this.getElementGraph(), this.getFlowNodeGraph().getSinkTaps());
        this.addAllGroups();
        this.traps.putAll(this.getFlowNodeGraph().getTrapsMap());
    }

    protected void addAllGroups() {
        this.addGroups(ElementGraphs.findAllGroups(this.getElementGraph()));
    }

    @Override
    public String getID() {
        return this.id;
    }

    public void setOrdinal(int ordinal) {
        this.ordinal = ordinal;
    }

    @Override
    public int getOrdinal() {
        return this.ordinal;
    }

    @Override
    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        if (name == null || name.isEmpty()) {
            throw new IllegalArgumentException("step name may not be null or empty");
        }
        this.name = name;
    }

    @Override
    public Map<String, String> getFlowStepDescriptor() {
        return Collections.unmodifiableMap(this.flowStepDescriptor);
    }

    protected void setFlowStepDescriptor(Map<String, String> flowStepDescriptor) {
        if (flowStepDescriptor != null) {
            this.flowStepDescriptor = flowStepDescriptor;
        }
    }

    @Override
    public Map<String, String> getProcessAnnotations() {
        if (this.processAnnotations == null) {
            return Collections.emptyMap();
        }
        return Collections.unmodifiableMap(this.processAnnotations);
    }

    @Override
    public void addProcessAnnotation(Enum annotation) {
        if (annotation == null) {
            return;
        }
        this.addProcessAnnotation(annotation.getDeclaringClass().getName(), annotation.name());
    }

    @Override
    public void addProcessAnnotation(String key, String value) {
        if (this.processAnnotations == null) {
            this.processAnnotations = new HashMap<String, String>();
        }
        this.processAnnotations.put(key, value);
    }

    public void setFlow(Flow<Config> flow) {
        this.flow = flow;
        this.flowID = flow.getID();
        this.flowName = flow.getName();
    }

    @Override
    public Flow<Config> getFlow() {
        return this.flow;
    }

    @Override
    public String getFlowID() {
        return this.flowID;
    }

    @Override
    public String getFlowName() {
        return this.flowName;
    }

    protected void setFlowName(String flowName) {
        this.flowName = flowName;
    }

    @Override
    public Config getConfig() {
        return this.flowStepConf;
    }

    @Override
    public Map<Object, Object> getConfigAsProperties() {
        return Collections.emptyMap();
    }

    protected void setConfig(Config flowStepConf) {
        this.flowStepConf = flowStepConf;
    }

    @Override
    public String getStepDisplayName() {
        return this.getStepDisplayName(Util.ID_LENGTH);
    }

    protected String getStepDisplayName(int idLength) {
        if (idLength < 0 || idLength > Util.ID_LENGTH) {
            idLength = Util.ID_LENGTH;
        }
        if (idLength == 0) {
            return String.format("%s/%s", this.getFlowName(), this.getName());
        }
        String flowID = this.getFlowID().substring(0, idLength);
        String stepID = this.getID().substring(0, idLength);
        return String.format("[%s/%s] %s/%s", flowID, stepID, this.getFlowName(), this.getName());
    }

    protected String getNodeDisplayName(FlowNode flowNode, int idLength) {
        if (idLength > Util.ID_LENGTH) {
            idLength = Util.ID_LENGTH;
        }
        String flowID = this.getFlowID().substring(0, idLength);
        String stepID = this.getID().substring(0, idLength);
        String nodeID = flowNode.getID().substring(0, idLength);
        return String.format("[%s/%s/%s] %s/%s", flowID, stepID, nodeID, this.getFlowName(), this.getName());
    }

    @Override
    public int getSubmitPriority() {
        return this.submitPriority;
    }

    @Override
    public void setSubmitPriority(int submitPriority) {
        if (submitPriority < 1 || submitPriority > 10) {
            throw new IllegalArgumentException("submitPriority must be between 1 and 10 inclusive, was: " + submitPriority);
        }
        this.submitPriority = submitPriority;
    }

    @Override
    public void setFlowStepStats(FlowStepStats flowStepStats) {
        this.flowStepStats = flowStepStats;
    }

    @Override
    public FlowStepStats getFlowStepStats() {
        return this.flowStepStats;
    }

    @Override
    public ElementGraph getElementGraph() {
        return this.elementGraph;
    }

    protected EnumMultiMap getAnnotations() {
        return ((AnnotatedGraph)((Object)this.elementGraph)).getAnnotations();
    }

    @Override
    public FlowNodeGraph getFlowNodeGraph() {
        return this.flowNodeGraph;
    }

    @Override
    public int getNumFlowNodes() {
        return this.flowNodeGraph.vertexSet().size();
    }

    @Override
    public Set<FlowElement> getSourceElements() {
        return ElementGraphs.findSources(this.getElementGraph(), FlowElement.class);
    }

    @Override
    public Set<FlowElement> getSinkElements() {
        return ElementGraphs.findSinks(this.getElementGraph(), FlowElement.class);
    }

    @Override
    public Group getGroup() {
        if (this.groups.isEmpty()) {
            return null;
        }
        if (this.groups.size() > 1) {
            throw new IllegalStateException("more than one group");
        }
        return this.groups.get(0);
    }

    @Override
    public Collection<Group> getGroups() {
        return this.groups;
    }

    public void addGroups(Collection<Group> groups) {
        for (Group group : groups) {
            this.addGroup(group);
        }
    }

    public void addGroup(Group group) {
        if (!this.groups.contains(group)) {
            this.groups.add(group);
        }
    }

    public Set<Tap> getAllAccumulatedSources() {
        return Util.narrowIdentitySet(Tap.class, this.getFlowNodeGraph().getFlowElementsFor(StreamMode.Accumulated));
    }

    public Set<Tap> getAllStreamedSources() {
        return Util.narrowIdentitySet(Tap.class, this.getFlowNodeGraph().getFlowElementsFor(StreamMode.Streamed));
    }

    public void addSource(String name, Tap source) {
        if (!this.sources.containsKey(source)) {
            this.sources.put(source, new HashSet());
        }
        this.sources.get(source).add(name);
    }

    public void addSink(String name, Tap sink) {
        if (!this.sinks.containsKey(sink)) {
            this.sinks.put(sink, new HashSet());
        }
        this.sinks.get(sink).add(name);
    }

    @Override
    public Set<Tap> getSourceTaps() {
        return Collections.unmodifiableSet(new HashSet<Tap>(this.sources.keySet()));
    }

    @Override
    public Set<Tap> getSinkTaps() {
        return Collections.unmodifiableSet(new HashSet<Tap>(this.sinks.keySet()));
    }

    @Override
    public Tap getSink() {
        if (this.sinks.size() == 0) {
            return null;
        }
        if (this.sinks.size() > 1) {
            throw new IllegalStateException("more than one sink");
        }
        return this.sinks.keySet().iterator().next();
    }

    @Override
    public Set<String> getSourceName(Tap source) {
        return Collections.unmodifiableSet(this.sources.get(source));
    }

    @Override
    public Set<String> getSinkName(Tap sink) {
        return Collections.unmodifiableSet(this.sinks.get(sink));
    }

    @Override
    public Tap getSourceWith(String identifier) {
        if (Util.isEmpty(identifier)) {
            return null;
        }
        for (Tap tap : this.sources.keySet()) {
            if (!identifier.equalsIgnoreCase(tap.getIdentifier())) continue;
            return tap;
        }
        return null;
    }

    @Override
    public Tap getSinkWith(String identifier) {
        if (Util.isEmpty(identifier)) {
            return null;
        }
        for (Tap tap : this.sinks.keySet()) {
            if (!identifier.equalsIgnoreCase(tap.getIdentifier())) continue;
            return tap;
        }
        return null;
    }

    @Override
    public Map<String, Tap> getTrapMap() {
        return this.traps;
    }

    @Override
    public Set<Tap> getTraps() {
        return Collections.unmodifiableSet(new HashSet<Tap>(this.traps.values()));
    }

    @Override
    public Tap getTrap(String name) {
        return this.getTrapMap().get(name);
    }

    boolean allSourcesExist() throws IOException {
        for (Tap tap : this.sources.keySet()) {
            if (tap.resourceExists(this.getConfig())) continue;
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean areSourcesNewer(long sinkModified) throws IOException {
        Config config = this.getConfig();
        Iterator<Tap> values = this.sources.keySet().iterator();
        long sourceModified = 0L;
        try {
            sourceModified = Util.getSourceModified(config, values, sinkModified);
            if (sinkModified < sourceModified) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            if (this.isInfoEnabled()) {
                this.logInfo("source modification date at: " + new Date(sourceModified), new Object[0]);
            }
        }
    }

    long getSinkModified() throws IOException {
        long sinkModified = Util.getSinkModified(this.getConfig(), this.sinks.keySet());
        if (this.isInfoEnabled()) {
            if (sinkModified == -1L) {
                this.logInfo("at least one sink is marked for delete", new Object[0]);
            }
            if (sinkModified == 0L) {
                this.logInfo("at least one sink does not exist", new Object[0]);
            } else {
                this.logInfo("sink oldest modified date: " + new Date(sinkModified), new Object[0]);
            }
        }
        return sinkModified;
    }

    protected Throwable prepareResources() {
        Throwable throwable = this.prepareResources(this.getSourceTaps(), false);
        if (throwable == null) {
            throwable = this.prepareResources(this.getSinkTaps(), true);
        }
        if (throwable == null) {
            throwable = this.prepareResources(this.getTraps(), true);
        }
        return throwable;
    }

    private Throwable prepareResources(Collection<Tap> taps, boolean forWrite) {
        Tap tap;
        Throwable throwable = null;
        Iterator<Tap> iterator = taps.iterator();
        while (iterator.hasNext() && (throwable = this.prepareResource(tap = iterator.next(), forWrite)) == null) {
        }
        return throwable;
    }

    private Throwable prepareResource(Tap tap, boolean forWrite) {
        FlowException throwable = null;
        try {
            boolean result = forWrite ? tap.prepareResourceForWrite(this.getConfig()) : tap.prepareResourceForRead(this.getConfig());
            if (!result) {
                String message = String.format("unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier(this.getConfig()));
                this.logError(message, new Object[0]);
                throwable = new FlowException(message);
            }
        }
        catch (Throwable exception) {
            String message = String.format("unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier(this.getConfig()));
            this.logError(message, exception);
            throwable = new FlowException(message, exception);
        }
        return throwable;
    }

    protected Throwable commitSinks() {
        Throwable throwable = null;
        for (Tap tap : this.sinks.keySet()) {
            if (throwable != null) {
                this.rollbackResource(tap);
                continue;
            }
            throwable = this.commitResource(tap);
        }
        return throwable;
    }

    private Throwable commitResource(Tap tap) {
        FlowException throwable = null;
        try {
            if (!tap.commitResource(this.getConfig())) {
                String message = "unable to commit sink: " + tap.getFullIdentifier(this.getConfig());
                this.logError(message, new Object[0]);
                throwable = new FlowException(message);
            }
        }
        catch (Throwable exception) {
            String message = "unable to commit sink: " + tap.getFullIdentifier(this.getConfig());
            this.logError(message, exception);
            throwable = new FlowException(message, exception);
        }
        return throwable;
    }

    private Throwable rollbackResource(Tap tap) {
        FlowException throwable = null;
        try {
            if (!tap.rollbackResource(this.getConfig())) {
                String message = "unable to rollback sink: " + tap.getFullIdentifier(this.getConfig());
                this.logError(message, new Object[0]);
                throwable = new FlowException(message);
            }
        }
        catch (Throwable exception) {
            String message = "unable to rollback sink: " + tap.getFullIdentifier(this.getConfig());
            this.logError(message, exception);
            throwable = new FlowException(message, exception);
        }
        return throwable;
    }

    protected Throwable rollbackSinks() {
        Throwable throwable = null;
        for (Tap tap : this.sinks.keySet()) {
            if (throwable != null) {
                this.rollbackResource(tap);
                continue;
            }
            throwable = this.rollbackResource(tap);
        }
        return throwable;
    }

    public abstract Config createInitializedConfig(FlowProcess<Config> var1, Config var2);

    protected Set<String> getFieldDeclaredSerializations(Class base) {
        Collection<SerializableType> serializableTypes = this.findAllSerializableTypes();
        return serializableTypes.stream().map(type -> type.getSerializer(base)).filter(Objects::nonNull).map(Class::getName).collect(Collectors.toSet());
    }

    protected Collection<SerializableType> findAllSerializableTypes() {
        Set<FlowElement> elements = Util.createIdentitySet();
        elements.addAll(Util.narrowIdentitySet(Tap.class, this.elementGraph.vertexSet()));
        elements.addAll(Util.narrowIdentitySet(Splice.class, this.elementGraph.vertexSet()));
        HashSet<SerializableType> types = new HashSet<SerializableType>();
        for (FlowElement element : elements) {
            Fields fields = this.elementGraph.outgoingEdgesOf(element).iterator().next().getOutValuesFields();
            Type[] fieldTypes = fields.getTypes();
            if (fieldTypes == null) continue;
            for (Type type : fieldTypes) {
                if (!(type instanceof SerializableType)) continue;
                types.add((SerializableType)type);
            }
        }
        return types;
    }

    public Set<Scope> getPreviousScopes(FlowElement flowElement) {
        return this.getElementGraph().incomingEdgesOf(flowElement);
    }

    public Scope getNextScope(FlowElement flowElement) {
        Set<Scope> set = this.getElementGraph().outgoingEdgesOf(flowElement);
        if (set.size() != 1) {
            throw new IllegalStateException("should only be one scope after current flow element: " + flowElement + " found: " + set.size());
        }
        return set.iterator().next();
    }

    public FlowElement getNextFlowElement(Scope scope) {
        return this.getElementGraph().getEdgeTarget(scope);
    }

    public Collection<Operation> getAllOperations() {
        Set<FlowElement> vertices = this.getElementGraph().vertexSet();
        ArrayList<Operation> operations = new ArrayList<Operation>();
        for (FlowElement vertex : vertices) {
            if (!(vertex instanceof Operator)) continue;
            operations.add(((Operator)vertex).getOperation());
        }
        return operations;
    }

    @Override
    public boolean containsPipeNamed(String pipeName) {
        Set<FlowElement> vertices = this.getElementGraph().vertexSet();
        for (FlowElement vertex : vertices) {
            if (!(vertex instanceof Pipe) || !((Pipe)vertex).getName().equals(pipeName)) continue;
            return true;
        }
        return false;
    }

    public void clean() {
        this.clean(this.getConfig());
    }

    public abstract void clean(Config var1);

    List<SafeFlowStepListener> getListeners() {
        if (this.listeners == null) {
            this.listeners = new LinkedList<SafeFlowStepListener>();
        }
        return this.listeners;
    }

    @Override
    public boolean hasListeners() {
        return this.listeners != null && !this.listeners.isEmpty();
    }

    @Override
    public void addListener(FlowStepListener flowStepListener) {
        this.getListeners().add(new SafeFlowStepListener(flowStepListener));
    }

    @Override
    public boolean removeListener(FlowStepListener flowStepListener) {
        return this.getListeners().remove(new SafeFlowStepListener(flowStepListener));
    }

    protected void fireOnCompleted() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onCompleted event: " + this.getListeners().size(), new Object[0]);
            }
            for (SafeFlowStepListener flowStepListener : this.getListeners()) {
                ((FlowStepListener)flowStepListener).onStepCompleted(this);
            }
        }
    }

    protected void fireOnThrowable(Throwable throwable) {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onThrowable event: " + this.getListeners().size(), new Object[0]);
            }
            for (SafeFlowStepListener flowStepListener : this.getListeners()) {
                ((FlowStepListener)flowStepListener).onStepThrowable(this, throwable);
            }
        }
    }

    protected void fireOnStopping() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onStopping event: " + this.getListeners(), new Object[0]);
            }
            for (SafeFlowStepListener flowStepListener : this.getListeners()) {
                ((FlowStepListener)flowStepListener).onStepStopping(this);
            }
        }
    }

    protected void fireOnStarting() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onStarting event: " + this.getListeners().size(), new Object[0]);
            }
            for (SafeFlowStepListener flowStepListener : this.getListeners()) {
                ((FlowStepListener)flowStepListener).onStepStarting(this);
            }
        }
    }

    protected void fireOnRunning() {
        if (this.hasListeners()) {
            if (this.isDebugEnabled()) {
                this.logDebug("firing onRunning event: " + this.getListeners().size(), new Object[0]);
            }
            for (SafeFlowStepListener flowStepListener : this.getListeners()) {
                ((FlowStepListener)flowStepListener).onStepRunning(this);
            }
        }
    }

    protected ClientState createClientState(FlowProcess flowProcess) {
        CascadingServices services = flowProcess.getCurrentSession().getCascadingServices();
        if (services == null) {
            return ClientState.NULL;
        }
        return services.createClientState(this.getID());
    }

    public FlowStepJob<Config> getFlowStepJob() {
        return this.flowStepJob;
    }

    public FlowStepJob<Config> getCreateFlowStepJob(FlowProcess<Config> flowProcess, Config parentConfig) {
        if (this.flowStepJob != null) {
            return this.flowStepJob;
        }
        if (flowProcess == null) {
            return null;
        }
        Config initializedConfig = this.createInitializedConfig(flowProcess, parentConfig);
        this.setConfig(initializedConfig);
        ClientState clientState = this.createClientState(flowProcess);
        this.flowStepJob = this.createFlowStepJob(clientState, flowProcess, initializedConfig);
        return this.flowStepJob;
    }

    protected abstract FlowStepJob createFlowStepJob(ClientState var1, FlowProcess<Config> var2, Config var3);

    protected void initConfFromNodeConfigDef(ElementGraph nodeElementGraph, ConfigDef.Setter setter) {
        nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph(nodeElementGraph);
        ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph(this.getElementGraph());
        for (ConfigDef.Mode mode : ConfigDef.Mode.values()) {
            Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator(nodeElementGraph);
            while (iterator.hasNext()) {
                FlowElement element = iterator.next();
                while (element != null) {
                    if (this.elementSpansDownStream(stepElementGraph, nodeElementGraph, element)) {
                        element = null;
                        continue;
                    }
                    if (element instanceof ScopedElement && ((ScopedElement)((Object)element)).hasNodeConfigDef()) {
                        ((ScopedElement)((Object)element)).getNodeConfigDef().apply(mode, setter);
                    }
                    if (element instanceof Pipe) {
                        element = ((Pipe)element).getParent();
                        continue;
                    }
                    element = null;
                }
            }
        }
    }

    private boolean elementSpansDownStream(ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element) {
        boolean spansNodes;
        boolean bl = spansNodes = !(element instanceof SubAssembly);
        if (spansNodes) {
            spansNodes = nodeElementGraph.outDegreeOf(element) == 0 && stepElementGraph.outDegreeOf(element) > 0;
        }
        return spansNodes;
    }

    protected void initConfFromStepConfigDef(ConfigDef.Setter setter) {
        ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph(this.getElementGraph());
        for (ConfigDef.Mode mode : ConfigDef.Mode.values()) {
            Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator(stepElementGraph);
            while (iterator.hasNext()) {
                FlowElement element = iterator.next();
                while (element != null) {
                    if (element instanceof ScopedElement && ((ScopedElement)((Object)element)).hasStepConfigDef()) {
                        ((ScopedElement)((Object)element)).getStepConfigDef().apply(mode, setter);
                    }
                    if (element instanceof Pipe) {
                        element = ((Pipe)element).getParent();
                        continue;
                    }
                    element = null;
                }
            }
        }
    }

    protected static void addSources(BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources) {
        for (Tap tap : sources) {
            for (Scope scope : elementGraph.outgoingEdgesOf(tap)) {
                flowStep.addSource(scope.getName(), tap);
            }
        }
    }

    protected static void addSinks(BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks) {
        for (Tap tap : sinks) {
            for (Scope scope : elementGraph.incomingEdgesOf(tap)) {
                flowStep.addSink(scope.getName(), tap);
            }
        }
    }

    public boolean equals(Object object) {
        if (this == object) {
            return true;
        }
        if (object == null || this.getClass() != object.getClass()) {
            return false;
        }
        BaseFlowStep flowStep = (BaseFlowStep)object;
        return !(this.id != null ? !this.id.equals(flowStep.id) : flowStep.id != null);
    }

    public int hashCode() {
        return this.id != null ? this.id.hashCode() : 0;
    }

    public String toString() {
        StringBuffer buffer = new StringBuffer();
        buffer.append(this.getClass().getSimpleName());
        buffer.append("[name: ").append(this.getName()).append("]");
        return buffer.toString();
    }

    @Override
    public final boolean isInfoEnabled() {
        return this.getLogger().isInfoEnabled();
    }

    private ProcessLogger getLogger() {
        if (this.flow != null && this.flow instanceof ProcessLogger) {
            return (ProcessLogger)((Object)this.flow);
        }
        return ProcessLogger.NULL;
    }

    @Override
    public final boolean isDebugEnabled() {
        return this.getLogger().isDebugEnabled();
    }

    @Override
    public void logDebug(String message, Object ... arguments) {
        this.getLogger().logDebug(message, arguments);
    }

    @Override
    public void logInfo(String message, Object ... arguments) {
        this.getLogger().logInfo(message, arguments);
    }

    @Override
    public void logWarn(String message) {
        this.getLogger().logWarn(message);
    }

    @Override
    public void logWarn(String message, Throwable throwable) {
        this.getLogger().logWarn(message, throwable);
    }

    @Override
    public void logWarn(String message, Object ... arguments) {
        this.getLogger().logWarn(message, arguments);
    }

    @Override
    public void logError(String message, Object ... arguments) {
        this.getLogger().logError(message, arguments);
    }

    @Override
    public void logError(String message, Throwable throwable) {
        this.getLogger().logError(message, throwable);
    }

    private class SafeFlowStepListener
    implements FlowStepListener {
        final FlowStepListener flowStepListener;
        Throwable throwable;

        private SafeFlowStepListener(FlowStepListener flowStepListener) {
            this.flowStepListener = flowStepListener;
        }

        @Override
        public void onStepStarting(FlowStep flowStep) {
            try {
                this.flowStepListener.onStepStarting(flowStep);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
            }
        }

        @Override
        public void onStepStopping(FlowStep flowStep) {
            try {
                this.flowStepListener.onStepStopping(flowStep);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
            }
        }

        @Override
        public void onStepCompleted(FlowStep flowStep) {
            try {
                this.flowStepListener.onStepCompleted(flowStep);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
            }
        }

        @Override
        public void onStepRunning(FlowStep flowStep) {
            try {
                this.flowStepListener.onStepRunning(flowStep);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
            }
        }

        @Override
        public boolean onStepThrowable(FlowStep flowStep, Throwable flowStepThrowable) {
            try {
                return this.flowStepListener.onStepThrowable(flowStep, flowStepThrowable);
            }
            catch (Throwable throwable) {
                this.handleThrowable(throwable);
                return false;
            }
        }

        private void handleThrowable(Throwable throwable) {
            this.throwable = throwable;
            BaseFlowStep.this.logWarn(String.format("flow step listener %s threw throwable", this.flowStepListener), throwable);
        }

        public boolean equals(Object object) {
            if (object instanceof SafeFlowStepListener) {
                return this.flowStepListener.equals(((SafeFlowStepListener)object).flowStepListener);
            }
            return this.flowStepListener.equals(object);
        }

        public int hashCode() {
            return this.flowStepListener.hashCode();
        }
    }
}

