/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.flow.Flow;
import cascading.flow.FlowElement;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.BaseMapReduceFlow;
import cascading.flow.hadoop.HadoopFlow;
import cascading.flow.hadoop.HadoopFlowStep;
import cascading.flow.hadoop.util.HadoopMRUtil;
import cascading.flow.planner.BaseFlowNode;
import cascading.flow.planner.Scope;
import cascading.flow.planner.graph.ElementDirectedGraph;
import cascading.flow.planner.graph.ElementGraph;
import cascading.flow.planner.graph.ElementGraphs;
import cascading.flow.planner.process.FlowNodeGraph;
import cascading.flow.planner.process.ProcessEdge;
import cascading.flow.planner.process.ProcessModel;
import cascading.pipe.Pipe;
import cascading.tap.Tap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapred.JobConf;

public class MapReduceFlowStep
extends HadoopFlowStep {
    public static final String MAP = "Map";
    public static final String SHUFFLE = "Shuffle";
    public static final String REDUCE = "Reduce";
    private final JobConf jobConf;

    public MapReduceFlowStep(HadoopFlow flow, JobConf jobConf) {
        if (flow == null) {
            throw new IllegalArgumentException("flow may not be null");
        }
        this.setName(jobConf.getJobName());
        this.setFlow((Flow)flow);
        this.jobConf = jobConf;
        this.configure();
    }

    protected MapReduceFlowStep(HadoopFlow flow, String stepName, JobConf jobConf, Tap sink) {
        if (flow == null) {
            throw new IllegalArgumentException("flow may not be null");
        }
        this.setName(stepName);
        this.setFlow((Flow)flow);
        this.jobConf = jobConf;
        this.addSink("default", sink);
    }

    protected JobConf getJobConf() {
        return this.jobConf;
    }

    public ElementGraph getElementGraph() {
        if (this.elementGraph == null) {
            this.elementGraph = this.createStepElementGraph(this.getFlowNodeGraph());
        }
        return this.elementGraph;
    }

    public FlowNodeGraph getFlowNodeGraph() {
        if (this.flowNodeGraph == null) {
            this.flowNodeGraph = this.createFlowNodeGraph(this.createNodeElementGraphs(this.jobConf));
        }
        return this.flowNodeGraph;
    }

    @Override
    public JobConf createInitializedConfig(FlowProcess<JobConf> flowProcess, JobConf parentConfig) {
        return this.jobConf;
    }

    private ElementGraph createStepElementGraph(FlowNodeGraph flowNodeGraph) {
        return ElementGraphs.asElementDirectedGraph((Collection)flowNodeGraph.getElementGraphs()).bindExtents();
    }

    private List<ElementGraph> createNodeElementGraphs(JobConf jobConf) {
        Pipe headOperation;
        ElementDirectedGraph mapElementGraph;
        BaseMapReduceFlow baseFlow = (BaseMapReduceFlow)this.getFlow();
        boolean hasReducer = HadoopMRUtil.hasReducer(jobConf);
        ArrayList<ElementGraph> result = new ArrayList<ElementGraph>();
        ElementDirectedGraph tailElementGraph = mapElementGraph = this.createElementDirectedGraph();
        Pipe tailOperation = headOperation = this.createMapOperation();
        mapElementGraph.addVertex((FlowElement)headOperation);
        result.add((ElementGraph)mapElementGraph);
        ElementDirectedGraph reduceElementGraph = null;
        if (hasReducer) {
            Pipe shuffleOperation = this.createShuffleOperation();
            mapElementGraph.addVertex((FlowElement)shuffleOperation);
            mapElementGraph.addEdge((FlowElement)headOperation, (FlowElement)shuffleOperation);
            reduceElementGraph = this.createElementDirectedGraph();
            reduceElementGraph.addVertex((FlowElement)shuffleOperation);
            Pipe reduceOperation = this.createReduceOperation();
            reduceElementGraph.addVertex((FlowElement)reduceOperation);
            reduceElementGraph.addEdge((FlowElement)shuffleOperation, (FlowElement)reduceOperation);
            tailOperation = reduceOperation;
            tailElementGraph = reduceElementGraph;
            result.add((ElementGraph)reduceElementGraph);
        }
        Map<String, Tap> sources = baseFlow.createSources(jobConf);
        for (Map.Entry<String, Tap> entry : sources.entrySet()) {
            mapElementGraph.addVertex((FlowElement)entry.getValue());
            mapElementGraph.addEdge((FlowElement)entry.getValue(), (FlowElement)headOperation, new Scope(entry.getKey()));
        }
        Map<String, Tap> sinks = baseFlow.createSinks(jobConf);
        for (Map.Entry<String, Tap> entry : sinks.entrySet()) {
            tailElementGraph.addVertex((FlowElement)entry.getValue());
            tailElementGraph.addEdge((FlowElement)tailOperation, (FlowElement)entry.getValue(), new Scope(entry.getKey()));
        }
        mapElementGraph.bindExtents();
        if (reduceElementGraph != null) {
            reduceElementGraph.bindExtents();
        }
        return result;
    }

    protected ElementDirectedGraph createElementDirectedGraph() {
        return new ElementDirectedGraph();
    }

    protected Pipe createMapOperation() {
        return new Pipe(MAP);
    }

    protected Pipe createShuffleOperation() {
        return new Pipe(SHUFFLE);
    }

    protected Pipe createReduceOperation() {
        return new Pipe(REDUCE);
    }

    protected FlowNodeGraph createFlowNodeGraph(List<ElementGraph> elementGraphs) {
        ElementGraph mapElementGraph = elementGraphs.get(0);
        ElementGraph reduceElementGraph = elementGraphs.size() == 2 ? elementGraphs.get(1) : null;
        FlowNodeGraph flowNodeGraph = new FlowNodeGraph();
        int nodes = elementGraphs.size();
        BaseFlowNode mapperNode = new BaseFlowNode(mapElementGraph, String.format("(1/%s)", nodes), 0);
        flowNodeGraph.addVertex((ProcessModel)mapperNode);
        if (nodes == 2) {
            BaseFlowNode reducerNode = new BaseFlowNode(reduceElementGraph, "(2/2)", 1);
            flowNodeGraph.addVertex((ProcessModel)reducerNode);
            flowNodeGraph.addEdge((ProcessModel)mapperNode, (ProcessModel)reducerNode, new ProcessEdge((ProcessModel)mapperNode, (ProcessModel)reducerNode));
        }
        return flowNodeGraph;
    }
}

