/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.tez.stream.graph;

import cascading.flow.FlowElement;
import cascading.flow.FlowElements;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.Flows;
import cascading.flow.hadoop.stream.HadoopMemoryJoinGate;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.stream.annotations.StreamMode;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.duct.Gate;
import cascading.flow.stream.element.InputSource;
import cascading.flow.stream.element.MemoryHashJoinGate;
import cascading.flow.stream.element.SinkStage;
import cascading.flow.stream.element.SourceStage;
import cascading.flow.stream.graph.IORole;
import cascading.flow.stream.graph.NodeStreamGraph;
import cascading.flow.tez.Hadoop2TezFlowProcess;
import cascading.flow.tez.stream.element.TezBoundaryStage;
import cascading.flow.tez.stream.element.TezCoGroupGate;
import cascading.flow.tez.stream.element.TezGroupByGate;
import cascading.flow.tez.stream.element.TezMergeGate;
import cascading.flow.tez.stream.element.TezSinkStage;
import cascading.flow.tez.stream.element.TezSourceStage;
import cascading.flow.tez.util.TezUtil;
import cascading.pipe.Boundary;
import cascading.pipe.CoGroup;
import cascading.pipe.Group;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.Splice;
import cascading.tap.Tap;
import cascading.util.SetMultiMap;
import cascading.util.SortedListMultiMap;
import cascading.util.Util;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Hadoop2TezStreamGraph
extends NodeStreamGraph {
    private static final Logger LOG = LoggerFactory.getLogger(Hadoop2TezStreamGraph.class);
    private InputSource streamedHead;
    private Map<String, LogicalInput> inputMap;
    private Map<String, LogicalOutput> outputMap;
    private Map<LogicalInput, Configuration> inputConfigMap = new HashMap<LogicalInput, Configuration>();
    private Map<LogicalOutput, Configuration> outputConfigMap = new HashMap<LogicalOutput, Configuration>();
    private SetMultiMap<String, LogicalInput> inputMultiMap;
    private SetMultiMap<String, LogicalOutput> outputMultiMap;

    public Hadoop2TezStreamGraph(Hadoop2TezFlowProcess currentProcess, FlowNode flowNode, Map<String, LogicalInput> inputMap, Map<String, LogicalOutput> outputMap) {
        super((FlowProcess)currentProcess, flowNode);
        this.inputMap = inputMap;
        this.outputMap = outputMap;
        this.buildGraph();
        this.setTraps();
        this.setScopes();
        this.printGraph(this.node.getID(), this.node.getName(), this.flowProcess.getCurrentSliceNum());
        this.bind();
        this.printBoundGraph(this.node.getID(), this.node.getName(), this.flowProcess.getCurrentSliceNum());
    }

    public InputSource getStreamedHead() {
        return this.streamedHead;
    }

    protected void buildGraph() {
        this.inputMultiMap = new SetMultiMap();
        for (Map.Entry<String, LogicalInput> entry : this.inputMap.entrySet()) {
            Configuration inputConfiguration = TezUtil.getInputConfiguration(entry.getValue());
            this.inputConfigMap.put(entry.getValue(), inputConfiguration);
            this.inputMultiMap.addAll((Object)TezUtil.getEdgeSourceID(entry.getValue(), inputConfiguration), (Object[])new LogicalInput[]{entry.getValue()});
        }
        this.outputMultiMap = new SetMultiMap();
        for (Map.Entry<String, LogicalInput> entry : this.outputMap.entrySet()) {
            Configuration outputConfiguration = TezUtil.getOutputConfiguration((LogicalOutput)entry.getValue());
            this.outputConfigMap.put((LogicalOutput)entry.getValue(), outputConfiguration);
            this.outputMultiMap.addAll((Object)TezUtil.getEdgeSinkID((LogicalOutput)entry.getValue(), outputConfiguration), (Object[])new LogicalOutput[]{(LogicalOutput)entry.getValue()});
        }
        if (this.inputMultiMap.getKeys().size() == 1) {
            this.streamedSource = Flows.getFlowElementForID((Set)this.node.getSourceElements(), (String)((String)Util.getFirst((Collection)this.inputMultiMap.getKeys())));
        } else {
            HashSet sourceElements = new HashSet(this.node.getSourceElements());
            Set set = this.node.getSourceElements((Enum)StreamMode.Accumulated);
            sourceElements.removeAll(set);
            if (sourceElements.size() != 1) {
                throw new IllegalStateException("too many input source keys, got: " + Util.join(sourceElements, (String)", "));
            }
            this.streamedSource = (FlowElement)Util.getFirst(sourceElements);
        }
        LOG.info("using streamed source: " + this.streamedSource);
        this.streamedHead = this.handleHead(this.streamedSource, this.flowProcess);
        HashSet accumulated = new HashSet(this.node.getSourceElements());
        accumulated.remove(this.streamedSource);
        Hadoop2TezFlowProcess hadoop2TezFlowProcess = (Hadoop2TezFlowProcess)this.flowProcess;
        TezConfiguration conf = hadoop2TezFlowProcess.getConfiguration();
        for (FlowElement flowElement : accumulated) {
            LOG.info("using accumulated source: " + flowElement);
            if (flowElement instanceof Tap) {
                Tap source = (Tap)flowElement;
                String property = conf.getRaw("cascading.node.accumulated.source.conf." + Tap.id((Tap)source));
                if (property == null) {
                    throw new IllegalStateException("accumulated source conf property missing for: " + source.getIdentifier());
                }
                conf = this.getSourceConf(hadoop2TezFlowProcess, conf, property);
            } else {
                conf = (TezConfiguration)this.inputConfigMap.get(FlowElements.id((FlowElement)flowElement));
            }
            Hadoop2TezFlowProcess flowProcess = conf == null ? hadoop2TezFlowProcess : new Hadoop2TezFlowProcess(hadoop2TezFlowProcess, conf);
            this.handleHead(flowElement, flowProcess);
        }
    }

    private TezConfiguration getSourceConf(FlowProcess<TezConfiguration> flowProcess, TezConfiguration conf, String property) {
        Map priorConf;
        try {
            priorConf = (Map)HadoopUtil.deserializeBase64((String)property, (Configuration)conf, HashMap.class, (boolean)true);
        }
        catch (IOException exception) {
            throw new FlowException("unable to deserialize properties", (Throwable)exception);
        }
        return (TezConfiguration)flowProcess.mergeMapIntoConfig((Object)conf, priorConf);
    }

    private InputSource handleHead(FlowElement source, FlowProcess flowProcess) {
        Object sourceDuct = source instanceof Tap ? this.createSourceStage((Tap)source, flowProcess) : (source instanceof Merge ? this.createMergeStage((Merge)source, IORole.source) : (source instanceof Boundary ? this.createBoundaryStage((Boundary)source, IORole.source) : (((Group)source).isGroupBy() ? this.createGroupByGate((GroupBy)source, IORole.source) : this.createCoGroupGate((CoGroup)source, IORole.source))));
        this.addHead((Duct)sourceDuct);
        this.handleDuct(source, (Duct)sourceDuct);
        return (InputSource)sourceDuct;
    }

    protected SourceStage createSourceStage(Tap source, FlowProcess flowProcess) {
        String id = Tap.id((Tap)source);
        LogicalInput logicalInput = this.inputMap.get(id);
        if (logicalInput == null) {
            logicalInput = this.inputMap.get(flowProcess.getStringProperty("cascading.node.source." + id));
        }
        if (logicalInput == null) {
            return new SourceStage(flowProcess, source);
        }
        return new TezSourceStage(flowProcess, source, logicalInput);
    }

    protected SinkStage createSinkStage(Tap sink) {
        String id = Tap.id((Tap)sink);
        LogicalOutput logicalOutput = this.outputMap.get(id);
        if (logicalOutput == null) {
            logicalOutput = this.outputMap.get(this.flowProcess.getStringProperty("cascading.node.sink." + id));
        }
        if (logicalOutput == null) {
            throw new IllegalStateException("could not find output for: " + sink);
        }
        return new TezSinkStage(this.flowProcess, sink, logicalOutput);
    }

    protected Duct createMergeStage(Merge element, IORole role) {
        if (role == IORole.pass) {
            return super.createMergeStage(element, IORole.pass);
        }
        if (role == IORole.sink) {
            return this.createSinkMergeGate(element);
        }
        if (role == IORole.source) {
            return this.createSourceMergeGate(element);
        }
        throw new UnsupportedOperationException("both role not supported with merge");
    }

    private Duct createSourceMergeGate(Merge element) {
        return new TezMergeGate(this.flowProcess, (Splice)element, IORole.source, this.createInputMap((FlowElement)element));
    }

    private Duct createSinkMergeGate(Merge element) {
        return new TezMergeGate(this.flowProcess, (Splice)element, IORole.sink, this.findLogicalOutputs((Pipe)element));
    }

    protected Duct createBoundaryStage(Boundary element, IORole role) {
        if (role == IORole.pass) {
            return super.createBoundaryStage(element, IORole.pass);
        }
        if (role == IORole.sink) {
            return this.createSinkBoundaryStage(element);
        }
        if (role == IORole.source) {
            return this.createSourceBoundaryStage(element);
        }
        throw new UnsupportedOperationException("both role not supported with boundary");
    }

    private Duct createSourceBoundaryStage(Boundary element) {
        return new TezBoundaryStage(this.flowProcess, element, IORole.source, this.findLogicalInput((Pipe)element));
    }

    private Duct createSinkBoundaryStage(Boundary element) {
        return new TezBoundaryStage(this.flowProcess, element, IORole.sink, this.findLogicalOutputs((Pipe)element));
    }

    protected Gate createGroupByGate(GroupBy element, IORole role) {
        if (role == IORole.sink) {
            return this.createSinkGroupByGate(element);
        }
        return this.createSourceGroupByGate(element);
    }

    protected Gate createCoGroupGate(CoGroup element, IORole role) {
        if (role == IORole.sink) {
            return this.createSinkCoGroupByGate(element);
        }
        return this.createSourceCoGroupByGate(element);
    }

    private Gate createSinkCoGroupByGate(CoGroup element) {
        return new TezCoGroupGate(this.flowProcess, element, IORole.sink, this.findLogicalOutput((Pipe)element));
    }

    private Gate createSourceCoGroupByGate(CoGroup element) {
        return new TezCoGroupGate(this.flowProcess, element, IORole.source, this.createInputMap((FlowElement)element));
    }

    protected Gate createSinkGroupByGate(GroupBy element) {
        return new TezGroupByGate(this.flowProcess, element, IORole.sink, this.findLogicalOutput((Pipe)element));
    }

    protected Gate createSourceGroupByGate(GroupBy element) {
        return new TezGroupByGate(this.flowProcess, element, IORole.source, this.createInputMap((FlowElement)element));
    }

    private LogicalOutput findLogicalOutput(Pipe element) {
        String id = Pipe.id((Pipe)element);
        LogicalOutput logicalOutput = this.outputMap.get(id);
        if (logicalOutput == null) {
            logicalOutput = this.outputMap.get(this.flowProcess.getStringProperty("cascading.node.sink." + id));
        }
        if (logicalOutput == null) {
            throw new IllegalStateException("could not find output for: " + element);
        }
        return logicalOutput;
    }

    private Collection<LogicalOutput> findLogicalOutputs(Pipe element) {
        String id = Pipe.id((Pipe)element);
        return this.outputMultiMap.getValues((Object)id);
    }

    private LogicalInput findLogicalInput(Pipe element) {
        String id = Pipe.id((Pipe)element);
        LogicalInput logicalInput = this.inputMap.get(id);
        if (logicalInput == null) {
            logicalInput = this.inputMap.get(this.flowProcess.getStringProperty("cascading.node.source." + id));
        }
        if (logicalInput == null) {
            throw new IllegalStateException("could not find input for: " + element);
        }
        return logicalInput;
    }

    private SortedListMultiMap<Integer, LogicalInput> createInputMap(FlowElement element) {
        String id = FlowElements.id((FlowElement)element);
        SortedListMultiMap ordinalMap = new SortedListMultiMap();
        for (LogicalInput logicalInput : this.inputMap.values()) {
            Configuration configuration = this.inputConfigMap.get(logicalInput);
            String foundID = configuration.get("cascading.node.source");
            if (Util.isEmpty((String)foundID)) {
                throw new IllegalStateException("cascading.node.source property not set on source LogicalInput");
            }
            if (!foundID.equals(id)) continue;
            String values = configuration.get("cascading.node.ordinals", "");
            List ordinals = Util.split(Integer.class, (String)",", (String)values);
            for (Integer ordinal : ordinals) {
                ordinalMap.put((Object)ordinal, (Object)logicalInput);
            }
        }
        return ordinalMap;
    }

    protected MemoryHashJoinGate createNonBlockingJoinGate(HashJoin join) {
        return new HadoopMemoryJoinGate(this.flowProcess, join);
    }
}

