/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.tez;

import cascading.CascadingException;
import cascading.flow.FlowElement;
import cascading.flow.FlowElements;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.ConfigurationSetter;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.planner.graph.ElementGraph;
import cascading.flow.planner.process.FlowNodeGraph;
import cascading.flow.planner.process.ProcessEdge;
import cascading.flow.planner.process.ProcessModel;
import cascading.flow.stream.annotations.StreamMode;
import cascading.flow.tez.FlowProcessor;
import cascading.flow.tez.Hadoop2TezFlow;
import cascading.flow.tez.planner.Hadoop2TezFlowStepJob;
import cascading.flow.tez.util.TezUtil;
import cascading.management.state.ClientState;
import cascading.pipe.Boundary;
import cascading.pipe.CoGroup;
import cascading.pipe.Group;
import cascading.pipe.GroupBy;
import cascading.pipe.Merge;
import cascading.pipe.Splice;
import cascading.property.ConfigDef;
import cascading.tap.CompositeTaps;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.PartitionTap;
import cascading.tap.hadoop.util.Hadoop18TapUtil;
import cascading.tuple.Fields;
import cascading.tuple.hadoop.TupleSerialization;
import cascading.tuple.hadoop.util.GroupingSortingComparator;
import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
import cascading.tuple.hadoop.util.ReverseTupleComparator;
import cascading.tuple.hadoop.util.TupleComparator;
import cascading.tuple.io.KeyTuple;
import cascading.tuple.io.TuplePair;
import cascading.tuple.io.ValueTuple;
import cascading.tuple.tez.util.GroupingSortingPartitioner;
import cascading.tuple.tez.util.TuplePartitioner;
import cascading.util.Util;
import cascading.util.Version;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Hadoop2TezFlowStep
extends BaseFlowStep<TezConfiguration> {
    private static final Logger LOG = LoggerFactory.getLogger(Hadoop2TezFlowStep.class);
    private Map<String, LocalResource> allLocalResources = new HashMap<String, LocalResource>();
    private Map<Path, Path> syncPaths = new HashMap<Path, Path>();
    private Map<String, String> environment = new HashMap<String, String>();

    public Hadoop2TezFlowStep(ElementGraph elementGraph, FlowNodeGraph flowNodeGraph) {
        super(elementGraph, flowNodeGraph);
    }

    public Map<Object, Object> getConfigAsProperties() {
        return HadoopUtil.createProperties((Configuration)((Configuration)this.getConfig()));
    }

    public TezConfiguration createInitializedConfig(FlowProcess<TezConfiguration> flowProcess, TezConfiguration parentConfig) {
        String appJarPath;
        TezConfiguration stepConf = parentConfig == null ? new TezConfiguration() : new TezConfiguration((Configuration)parentConfig);
        Set serializations = this.getFieldDeclaredSerializations(Serialization.class);
        TupleSerialization.setSerializations((Configuration)stepConf, (Collection)serializations);
        String versionString = Version.getRelease();
        if (versionString != null) {
            stepConf.set("cascading.version", versionString);
        }
        stepConf.set("cascading.flow.step.id", this.getID());
        stepConf.set("cascading.flow.step.num", Integer.toString(this.getOrdinal()));
        String flowStagingPath = ((Hadoop2TezFlow)this.getFlow()).getFlowStagingPath();
        List classPath = ((Hadoop2TezFlow)this.getFlow()).getClassPath();
        HashMap<String, LocalResource> dagResources = new HashMap<String, LocalResource>();
        if (!classPath.isEmpty()) {
            Map<Path, Path> dagClassPath = TezUtil.addToClassPath((Configuration)stepConf, flowStagingPath, null, classPath, LocalResourceType.FILE, dagResources, null);
            this.syncPaths.putAll(dagClassPath);
        }
        if ((appJarPath = stepConf.get("cascading.app.appjar.path")) != null) {
            List<String> classpath = Collections.singletonList(appJarPath);
            Map<Path, Path> pathMap = TezUtil.addToClassPath((Configuration)stepConf, flowStagingPath, null, classpath, LocalResourceType.ARCHIVE, dagResources, this.environment);
            this.syncPaths.putAll(pathMap);
            String fileName = new File(appJarPath).getName();
            stepConf.set("tez.cluster.additional.classpath.prefix", "$PWD/" + fileName + "/:$PWD/" + fileName + "/classes/:$PWD/" + fileName + "/lib/*:");
        }
        this.allLocalResources.putAll(dagResources);
        this.initFromStepConfigDef((Configuration)stepConf);
        return stepConf;
    }

    protected FlowStepJob createFlowStepJob(ClientState clientState, FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedStepConfig) {
        DAG dag = this.createDAG(flowProcess, initializedStepConfig);
        return new Hadoop2TezFlowStepJob(clientState, this, initializedStepConfig, dag);
    }

    private DAG createDAG(FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig) {
        FlowNodeGraph nodeGraph = this.getFlowNodeGraph();
        HashMap<FlowNode, Vertex> vertexMap = new HashMap<FlowNode, Vertex>();
        DAG dag = DAG.create((String)this.getStepDisplayName(initializedConfig.getInt("cascading.display.id.truncate", Util.ID_LENGTH)));
        dag.addTaskLocalFiles(this.allLocalResources);
        Iterator iterator = nodeGraph.getOrderedTopologicalIterator();
        while (iterator.hasNext()) {
            FlowNode flowNode = (FlowNode)iterator.next();
            Vertex vertex = this.createVertex(flowProcess, initializedConfig, flowNode);
            dag.addVertex(vertex);
            vertexMap.put(flowNode, vertex);
        }
        LinkedList<ProcessEdge> processedEdges = new LinkedList<ProcessEdge>();
        for (ProcessEdge processEdge : nodeGraph.edgeSet()) {
            if (processedEdges.contains(processEdge)) continue;
            FlowNode edgeTargetFlowNode = (FlowNode)nodeGraph.getEdgeTarget(processEdge);
            FlowElement flowElement = processEdge.getFlowElement();
            List sourceNodes = nodeGraph.getElementSourceProcesses(flowElement);
            EdgeProperty edgeProperty = this.createEdgeProperty(initializedConfig, processEdge);
            Vertex targetVertex = (Vertex)vertexMap.get(edgeTargetFlowNode);
            if (sourceNodes.size() == 1 || flowElement instanceof CoGroup || flowElement instanceof Boundary) {
                FlowNode edgeSourceFlowNode = (FlowNode)nodeGraph.getEdgeSource(processEdge);
                Vertex sourceVertex = (Vertex)vertexMap.get(edgeSourceFlowNode);
                LOG.debug("adding edge between: {} and {}", (Object)sourceVertex, (Object)targetVertex);
                dag.addEdge(Edge.create((Vertex)sourceVertex, (Vertex)targetVertex, (EdgeProperty)edgeProperty));
                continue;
            }
            if (flowElement instanceof GroupBy || flowElement instanceof Merge) {
                ArrayList<String> sourceVerticesIDs = new ArrayList<String>();
                ArrayList sourceVertices = new ArrayList();
                for (FlowNode edgeSourceFlowNode : sourceNodes) {
                    sourceVerticesIDs.add(edgeSourceFlowNode.getID());
                    sourceVertices.add(vertexMap.get(edgeSourceFlowNode));
                    processedEdges.add(nodeGraph.getEdge((ProcessModel)edgeSourceFlowNode, (ProcessModel)edgeTargetFlowNode));
                }
                VertexGroup vertexGroup = dag.createVertexGroup(edgeTargetFlowNode.getID(), sourceVertices.toArray(new Vertex[sourceVertices.size()]));
                String inputClassName = flowElement instanceof Group ? OrderedGroupedMergedKVInput.class.getName() : ConcatenatedMergedKeyValueInput.class.getName();
                InputDescriptor inputDescriptor = (InputDescriptor)InputDescriptor.create((String)inputClassName).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload());
                String type = ((Splice)flowElement).isMerge() ? "merged" : "grouped";
                LOG.info("adding {} edge between: {} and {}", new Object[]{type, Util.join(sourceVerticesIDs, (String)","), targetVertex.getName()});
                dag.addEdge(GroupInputEdge.create((VertexGroup)vertexGroup, (Vertex)targetVertex, (EdgeProperty)edgeProperty, (InputDescriptor)inputDescriptor));
                continue;
            }
            throw new UnsupportedOperationException("can't make edge for: " + flowElement);
        }
        return dag;
    }

    private EdgeProperty createEdgeProperty(TezConfiguration config, ProcessEdge processEdge) {
        FlowElement flowElement = processEdge.getFlowElement();
        EdgeValues edgeValues = new EdgeValues(new TezConfiguration((Configuration)config), processEdge);
        edgeValues.keyClassName = KeyTuple.class.getName();
        edgeValues.valueClassName = ValueTuple.class.getName();
        edgeValues.keyComparatorClassName = TupleComparator.class.getName();
        edgeValues.keyPartitionerClassName = TuplePartitioner.class.getName();
        edgeValues.outputClassName = null;
        edgeValues.inputClassName = null;
        edgeValues.movementType = null;
        edgeValues.sourceType = null;
        edgeValues.schedulingType = null;
        if (flowElement instanceof Group) {
            this.applyGroup(edgeValues);
        } else if ((flowElement instanceof Boundary || flowElement instanceof Merge) && processEdge.getSinkAnnotations().contains(StreamMode.Accumulated)) {
            this.applyBoundaryMergeAccumulated(edgeValues);
        } else if (flowElement instanceof Boundary || flowElement instanceof Merge) {
            this.applyBoundaryMerge(edgeValues);
        } else {
            throw new IllegalStateException("unsupported flow element: " + flowElement.getClass().getCanonicalName());
        }
        this.applyEdgeAnnotations(processEdge, edgeValues);
        return this.createEdgeProperty(edgeValues);
    }

    private void applyEdgeAnnotations(ProcessEdge processEdge, EdgeValues edgeValues) {
        processEdge.addEdgeAnnotation((Enum)edgeValues.movementType);
        processEdge.addEdgeAnnotation((Enum)edgeValues.sourceType);
        processEdge.addEdgeAnnotation((Enum)edgeValues.schedulingType);
    }

    private EdgeValues applyBoundaryMerge(EdgeValues edgeValues) {
        edgeValues.outputClassName = UnorderedPartitionedKVOutput.class.getName();
        edgeValues.inputClassName = UnorderedKVInput.class.getName();
        edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
        edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
        edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
        return edgeValues;
    }

    private EdgeValues applyBoundaryMergeAccumulated(EdgeValues edgeValues) {
        edgeValues.outputClassName = UnorderedKVOutput.class.getName();
        edgeValues.inputClassName = UnorderedKVInput.class.getName();
        edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST;
        edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
        edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
        return edgeValues;
    }

    private EdgeValues applyGroup(EdgeValues edgeValues) {
        Group group = (Group)edgeValues.flowElement;
        if (group.isSortReversed()) {
            edgeValues.keyComparatorClassName = ReverseTupleComparator.class.getName();
        }
        int ordinal = (Integer)Util.getFirst(edgeValues.ordinals);
        HadoopUtil.addComparators((Configuration)edgeValues.config, (String)"cascading.group.comparator", (Map)group.getKeySelectors(), (Fields)edgeValues.getResolvedKeyFieldsMap().get(ordinal));
        if (!group.isGroupBy()) {
            edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
            edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
            edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
            edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
            edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
        } else {
            HadoopUtil.addComparators((Configuration)edgeValues.config, (String)"cascading.sort.comparator", (Map)group.getSortingSelectors(), (Fields)edgeValues.getResolvedSortFieldsMap().get(ordinal));
            edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
            edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
            edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
            edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
            edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
        }
        if (group.isSorted()) {
            edgeValues.keyClassName = TuplePair.class.getName();
            edgeValues.keyPartitionerClassName = GroupingSortingPartitioner.class.getName();
            edgeValues.keyComparatorClassName = group.isSortReversed() ? ReverseGroupingSortingComparator.class.getName() : GroupingSortingComparator.class.getName();
        }
        return edgeValues;
    }

    private EdgeProperty createEdgeProperty(EdgeValues edgeValues) {
        TezConfiguration outputConfig = new TezConfiguration((Configuration)edgeValues.getConfig());
        outputConfig.set("cascading.node.sink", FlowElements.id((FlowElement)edgeValues.getFlowElement()));
        outputConfig.set("cascading.node.ordinals", Util.join((Collection)edgeValues.getOrdinals(), (String)","));
        HadoopUtil.addFields((Configuration)outputConfig, (String)"cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap());
        HadoopUtil.addFields((Configuration)outputConfig, (String)"cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap());
        HadoopUtil.addFields((Configuration)outputConfig, (String)"cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap());
        UserPayload outputPayload = this.createIntermediatePayloadOutput(outputConfig, edgeValues);
        TezConfiguration inputConfig = new TezConfiguration((Configuration)edgeValues.getConfig());
        inputConfig.set("cascading.node.source", FlowElements.id((FlowElement)edgeValues.getFlowElement()));
        inputConfig.set("cascading.node.ordinals", Util.join((Collection)edgeValues.getOrdinals(), (String)","));
        HadoopUtil.addFields((Configuration)inputConfig, (String)"cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap());
        HadoopUtil.addFields((Configuration)inputConfig, (String)"cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap());
        HadoopUtil.addFields((Configuration)inputConfig, (String)"cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap());
        UserPayload inputPayload = this.createIntermediatePayloadInput(inputConfig, edgeValues);
        return EdgeProperty.create((EdgeProperty.DataMovementType)edgeValues.getMovementType(), (EdgeProperty.DataSourceType)edgeValues.getSourceType(), (EdgeProperty.SchedulingType)edgeValues.getSchedulingType(), (OutputDescriptor)((OutputDescriptor)OutputDescriptor.create((String)edgeValues.getOutputClassName()).setUserPayload(outputPayload)), (InputDescriptor)((InputDescriptor)InputDescriptor.create((String)edgeValues.getInputClassName()).setUserPayload(inputPayload)));
    }

    private UserPayload createIntermediatePayloadOutput(TezConfiguration config, EdgeValues edgeValues) {
        config.set("tez.runtime.key.class", edgeValues.keyClassName);
        config.set("tez.runtime.value.class", edgeValues.valueClassName);
        config.set("tez.runtime.key.comparator.class", edgeValues.keyComparatorClassName);
        config.set("tez.runtime.partitioner.class", edgeValues.keyPartitionerClassName);
        Hadoop2TezFlowStep.setWorkingDirectory((Configuration)config);
        return this.getPayload((Configuration)config);
    }

    private UserPayload createIntermediatePayloadInput(TezConfiguration config, EdgeValues edgeValues) {
        config.set("tez.runtime.key.class", edgeValues.keyClassName);
        config.set("tez.runtime.value.class", edgeValues.valueClassName);
        config.set("tez.runtime.key.comparator.class", edgeValues.keyComparatorClassName);
        config.set("tez.runtime.partitioner.class", edgeValues.keyPartitionerClassName);
        Hadoop2TezFlowStep.setWorkingDirectory((Configuration)config);
        return this.getPayload((Configuration)config);
    }

    private static void setWorkingDirectory(Configuration conf) {
        String name = conf.get("mapreduce.job.working.dir");
        if (name != null) {
            return;
        }
        try {
            Path dir = FileSystem.get((Configuration)conf).getWorkingDirectory();
            conf.set("mapreduce.job.working.dir", dir.toString());
        }
        catch (IOException exception) {
            throw new RuntimeException(exception);
        }
    }

    public Vertex createVertex(FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig, FlowNode flowNode) {
        JobConf conf = new JobConf((Configuration)initializedConfig);
        this.addInputOutputMapping(conf, flowNode);
        conf.setBoolean("mapred.used.genericoptionsparser", true);
        HashMap<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
        Map<FlowElement, Configuration> sourceConfigs = this.initFromSources(flowNode, flowProcess, (Configuration)conf, taskLocalResources);
        Map<FlowElement, Configuration> sinkConfigs = this.initFromSinks(flowNode, flowProcess, (Configuration)conf);
        this.initFromTraps(flowNode, flowProcess, (Configuration)conf);
        this.initFromNodeConfigDef(flowNode, (Configuration)conf);
        this.setLocalMode((Configuration)initializedConfig, conf, null);
        conf.set("cascading.flow.node.num", Integer.toString(flowNode.getOrdinal()));
        HadoopUtil.setIsInflow((Configuration)conf);
        int parallelism = this.getParallelism(flowNode, conf);
        if (parallelism == 0) {
            throw new FlowException(this.getName(), "the default number of gather partitions must be set, see cascading.flow.FlowRuntimeProps");
        }
        flowNode.addProcessAnnotation("cascading.flow.runtime.gather.partitions.num", Integer.toString(parallelism));
        Vertex vertex = this.newVertex(flowNode, (Configuration)conf, parallelism);
        if (!taskLocalResources.isEmpty()) {
            vertex.addTaskLocalFiles(taskLocalResources);
        }
        for (FlowElement flowElement : sourceConfigs.keySet()) {
            if (!(flowElement instanceof Tap)) continue;
            Configuration sourceConf = sourceConfigs.get(flowElement);
            if (sourceConf.get("mapred.mapper.new-api") == null) {
                HadoopUtil.setNewApi((Configuration)sourceConf, (String)sourceConf.get("mapred.input.format.class", sourceConf.get("mapreduce.job.inputformat.class")));
            }
            MRInput.MRInputConfigBuilder configBuilder = MRInput.createConfigBuilder((Configuration)sourceConf, null);
            if (conf.get("cascading.flow.runtime.splits.combine") != null) {
                configBuilder.groupSplits(conf.getBoolean("cascading.flow.runtime.splits.combine", true));
            }
            if (!CompositeTaps.unwindNarrow(PartitionTap.class, (Tap)((Tap)flowElement)).isEmpty()) {
                configBuilder.groupSplits(false);
            }
            DataSourceDescriptor dataSourceDescriptor = configBuilder.build();
            vertex.addDataSource(FlowElements.id((FlowElement)flowElement), dataSourceDescriptor);
        }
        for (FlowElement flowElement : sinkConfigs.keySet()) {
            String outputPath;
            Class outputFormatClass;
            if (!(flowElement instanceof Tap)) continue;
            Configuration sinkConf = sinkConfigs.get(flowElement);
            String formatClassName = sinkConf.get("mapred.output.format.class", sinkConf.get("mapreduce.job.outputformat.class"));
            if (formatClassName == null) {
                outputFormatClass = TextOutputFormat.class;
                outputPath = Hfs.getTempPath((Configuration)sinkConf).toString();
            } else {
                outputFormatClass = Util.loadClass((String)formatClassName);
                outputPath = this.getOutputPath(sinkConf);
            }
            if (outputPath == null && this.getOutputPath(sinkConf) == null && this.isFileOutputFormat(outputFormatClass)) {
                outputPath = Hfs.getTempPath((Configuration)sinkConf).toString();
            }
            MROutput.MROutputConfigBuilder configBuilder = MROutput.createConfigBuilder((Configuration)sinkConf, (Class)outputFormatClass, (String)outputPath);
            DataSinkDescriptor dataSinkDescriptor = configBuilder.build();
            vertex.addDataSink(FlowElements.id((FlowElement)flowElement), dataSinkDescriptor);
        }
        this.addRemoteDebug(flowNode, vertex);
        this.addRemoteProfiling(flowNode, vertex);
        if (vertex.getTaskLaunchCmdOpts() != null) {
            flowNode.addProcessAnnotation("tez.task.launch.cmd-opts", vertex.getTaskLaunchCmdOpts());
        }
        return vertex;
    }

    protected String getOutputPath(Configuration sinkConf) {
        return sinkConf.get("mapred.output.dir", sinkConf.get("mapreduce.output.fileoutputformat.outputdir"));
    }

    protected boolean isFileOutputFormat(Class outputFormatClass) {
        return org.apache.hadoop.mapred.FileOutputFormat.class.isAssignableFrom(outputFormatClass) || FileOutputFormat.class.isAssignableFrom(outputFormatClass);
    }

    protected int getParallelism(FlowNode flowNode, JobConf conf) {
        HashSet sourceStreamedTaps = new HashSet(flowNode.getSourceTaps());
        sourceStreamedTaps.removeAll(flowNode.getSourceElements((Enum)StreamMode.Accumulated));
        if (sourceStreamedTaps.size() != 0) {
            return -1;
        }
        int parallelism = Integer.MAX_VALUE;
        for (Tap tap : flowNode.getSinkTaps()) {
            int numSinkParts = tap.getScheme().getNumSinkParts();
            if (numSinkParts == 0) continue;
            if (parallelism != Integer.MAX_VALUE) {
                LOG.info("multiple sink taps in flow node declaring numSinkParts, choosing lowest value. see cascading.flow.FlowRuntimeProps for broader control.");
            }
            parallelism = Math.min(parallelism, numSinkParts);
        }
        if (parallelism != Integer.MAX_VALUE) {
            return parallelism;
        }
        return conf.getInt("cascading.flow.runtime.gather.partitions.num", 0);
    }

    private void addInputOutputMapping(JobConf conf, FlowNode flowNode) {
        FlowNodeGraph flowNodeGraph = this.getFlowNodeGraph();
        Set incomingEdges = flowNodeGraph.incomingEdgesOf((ProcessModel)flowNode);
        for (ProcessEdge processEdge : incomingEdges) {
            conf.set("cascading.node.source." + processEdge.getFlowElementID(), processEdge.getSourceProcessID());
        }
        Set outgoingEdges = flowNodeGraph.outgoingEdgesOf((ProcessModel)flowNode);
        for (ProcessEdge processEdge : outgoingEdges) {
            conf.set("cascading.node.sink." + processEdge.getFlowElementID(), processEdge.getSinkProcessID());
        }
    }

    protected Map<FlowElement, Configuration> initFromSources(FlowNode flowNode, FlowProcess<TezConfiguration> flowProcess, Configuration conf, Map<String, LocalResource> taskLocalResources) {
        Set accumulatedSources = flowNode.getSourceElements((Enum)StreamMode.Accumulated);
        for (FlowElement element : accumulatedSources) {
            if (!(element instanceof Tap)) continue;
            JobConf current = new JobConf(conf);
            Tap tap = (Tap)element;
            if (tap.getIdentifier() == null) {
                throw new IllegalStateException("tap may not have null identifier: " + tap.toString());
            }
            tap.sourceConfInit(flowProcess, (Object)current);
            Collection paths = current.getStringCollection("cascading.resources.local." + Tap.id((Tap)tap));
            if (!paths.isEmpty()) {
                String flowStagingPath = ((Hadoop2TezFlow)this.getFlow()).getFlowStagingPath();
                String resourceSubPath = Tap.id((Tap)tap);
                Map<Path, Path> pathMap = TezUtil.addToClassPath((Configuration)current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null);
                current.setStrings("cascading.resources.remote." + Tap.id((Tap)tap), taskLocalResources.keySet().toArray(new String[taskLocalResources.size()]));
                this.allLocalResources.putAll(taskLocalResources);
                this.syncPaths.putAll(pathMap);
            }
            Map map = flowProcess.diffConfigIntoMap((Object)new TezConfiguration(conf), (Object)new TezConfiguration((Configuration)current));
            conf.set("cascading.node.accumulated.source.conf." + Tap.id((Tap)tap), this.pack(map, conf));
            this.setLocalMode(conf, current, tap);
        }
        HashSet sources = new HashSet(flowNode.getSourceElements());
        sources.removeAll(accumulatedSources);
        if (sources.isEmpty()) {
            throw new IllegalStateException("all sources marked as accumulated");
        }
        HashMap<FlowElement, Configuration> configs = new HashMap<FlowElement, Configuration>();
        for (FlowElement element : sources) {
            JobConf current = new JobConf(conf);
            String id = FlowElements.id((FlowElement)element);
            current.set("cascading.node.source", id);
            if (element instanceof Tap) {
                Tap tap = (Tap)element;
                if (tap.getIdentifier() == null) {
                    throw new IllegalStateException("tap may not have null identifier: " + tap.toString());
                }
                tap.sourceConfInit(flowProcess, (Object)current);
                this.setLocalMode(conf, current, tap);
            }
            configs.put(element, (Configuration)current);
        }
        return configs;
    }

    protected Map<FlowElement, Configuration> initFromSinks(FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf) {
        Set sinks = flowNode.getSinkElements();
        HashMap<FlowElement, Configuration> configs = new HashMap<FlowElement, Configuration>();
        for (FlowElement element : sinks) {
            JobConf current = new JobConf(conf);
            if (element instanceof Tap) {
                Tap tap = (Tap)element;
                if (tap.getIdentifier() == null) {
                    throw new IllegalStateException("tap may not have null identifier: " + element.toString());
                }
                tap.sinkConfInit(flowProcess, (Object)current);
                this.setLocalMode(conf, current, tap);
            }
            String id = FlowElements.id((FlowElement)element);
            current.set("cascading.node.sink", id);
            configs.put(element, (Configuration)current);
        }
        return configs;
    }

    private void initFromNodeConfigDef(FlowNode flowNode, Configuration conf) {
        this.initConfFromNodeConfigDef(flowNode.getElementGraph(), (ConfigDef.Setter)new ConfigurationSetter(conf));
    }

    private void initFromStepConfigDef(Configuration conf) {
        this.initConfFromStepConfigDef((ConfigDef.Setter)new ConfigurationSetter(conf));
    }

    protected void initFromTraps(FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf) {
        Map traps = flowNode.getTrapMap();
        if (!traps.isEmpty()) {
            JobConf trapConf = new JobConf(conf);
            for (Tap tap : traps.values()) {
                tap.sinkConfInit(flowProcess, (Object)trapConf);
                this.setLocalMode(conf, trapConf, tap);
            }
        }
    }

    private Vertex newVertex(FlowNode flowNode, Configuration conf, int parallelism) {
        conf.set("cascading.flow.node", this.pack(flowNode, conf));
        ProcessorDescriptor descriptor = ProcessorDescriptor.create((String)FlowProcessor.class.getName());
        descriptor.setUserPayload(this.getPayload(conf));
        Vertex vertex = Vertex.create((String)flowNode.getID(), (ProcessorDescriptor)descriptor, (int)parallelism);
        if (this.environment != null) {
            vertex.setTaskEnvironment(this.environment);
        }
        return vertex;
    }

    private UserPayload getPayload(Configuration conf) {
        try {
            return TezUtils.createUserPayloadFromConf((Configuration)conf);
        }
        catch (IOException exception) {
            throw new CascadingException((Throwable)exception);
        }
    }

    private String pack(Object object, Configuration conf) {
        try {
            return HadoopUtil.serializeBase64((Object)object, (Configuration)conf, (boolean)true);
        }
        catch (IOException exception) {
            throw new FlowException("unable to pack object: " + object.getClass().getCanonicalName(), (Throwable)exception);
        }
    }

    public void clean(TezConfiguration config) {
        for (Tap sink : this.getSinkTaps()) {
            if (sink.isTemporary() && (this.getFlow().getFlowStats().isSuccessful() || this.getFlow().getRunID() == null)) {
                try {
                    sink.deleteResource((Object)config);
                }
                catch (Exception exception) {
                    this.logWarn("unable to remove temporary file: " + sink, exception);
                }
                continue;
            }
            this.cleanTapMetaData(config, sink);
        }
        for (Tap tap : this.getTraps()) {
            this.cleanTapMetaData(config, tap);
        }
    }

    private void cleanTapMetaData(TezConfiguration config, Tap tap) {
        try {
            Hadoop18TapUtil.cleanupTapMetaData((Configuration)config, (Tap)tap);
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    public void syncArtifacts() {
        Map timestamps = HadoopUtil.syncPaths((Configuration)((Configuration)this.getConfig()), this.syncPaths, (boolean)true);
        for (Map.Entry entry : timestamps.entrySet()) {
            LocalResource localResource = this.allLocalResources.get(entry.getKey());
            if (localResource == null) continue;
            localResource.setTimestamp(((Long)entry.getValue()).longValue());
        }
    }

    private void setLocalMode(Configuration parent, JobConf current, Tap tap) {
        if (!HadoopUtil.isLocal((Configuration)current)) {
            return;
        }
        if (tap != null) {
            this.logInfo("tap forcing step to tez local mode: " + tap.getIdentifier(), new Object[0]);
        }
        HadoopUtil.setLocal((Configuration)parent);
    }

    private void addRemoteDebug(FlowNode flowNode, Vertex vertex) {
        String value = System.getProperty("test.debug.node", null);
        if (Util.isEmpty((String)value)) {
            return;
        }
        if (!flowNode.getSourceElementNames().contains(value) && this.asInt(value) != flowNode.getOrdinal()) {
            return;
        }
        LOG.warn("remote debugging enabled with property: {}, on node: {}, with node id: {}", new Object[]{"test.debug.node", value, flowNode.getID()});
        String opts = vertex.getTaskLaunchCmdOpts();
        if (opts == null) {
            opts = "";
        }
        String address = System.getProperty("test.debug.address", "localhost:5005").trim();
        opts = opts + " -agentlib:jdwp=transport=dt_socket,server=n,address=" + address + ",suspend=y";
        vertex.setTaskLaunchCmdOpts(opts);
    }

    private void addRemoteProfiling(FlowNode flowNode, Vertex vertex) {
        String path;
        String value = System.getProperty("test.profile.node", null);
        if (Util.isEmpty((String)value)) {
            return;
        }
        if (!flowNode.getSourceElementNames().contains(value) && this.asInt(value) != flowNode.getOrdinal()) {
            return;
        }
        LOG.warn("remote profiling enabled with property: {}, on node: {}, with node id: {}", new Object[]{"test.profile.node", value, flowNode.getID()});
        String opts = vertex.getTaskLaunchCmdOpts();
        if (opts == null) {
            opts = "";
        }
        if (!(path = System.getProperty("test.profile.path", "/tmp/jfr/")).endsWith("/")) {
            path = path + "/";
        }
        LOG.warn("remote profiling property: {}, logging to path: {}", (Object)"test.profile.path", (Object)path);
        opts = opts + String.format(" -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=%1$s%2$s,disk=true,repository=%1$s%2$s", path, flowNode.getID());
        vertex.setTaskLaunchCmdOpts(opts);
    }

    private int asInt(String value) {
        try {
            return Integer.parseInt(value);
        }
        catch (NumberFormatException exception) {
            return -1;
        }
    }

    public Map<String, LocalResource> getAllLocalResources() {
        return this.allLocalResources;
    }

    private static class EdgeValues {
        FlowElement flowElement;
        TezConfiguration config;
        Set<Integer> ordinals;
        String keyClassName;
        String valueClassName;
        String keyComparatorClassName;
        String keyPartitionerClassName;
        String outputClassName;
        String inputClassName;
        EdgeProperty.DataMovementType movementType;
        EdgeProperty.DataSourceType sourceType;
        EdgeProperty.SchedulingType schedulingType;
        Map<Integer, Fields> resolvedKeyFieldsMap;
        Map<Integer, Fields> resolvedSortFieldsMap;
        Map<Integer, Fields> resolvedValueFieldsMap;

        private EdgeValues(TezConfiguration config, ProcessEdge processEdge) {
            this.config = config;
            this.flowElement = processEdge.getFlowElement();
            this.ordinals = processEdge.getSourceProvidedOrdinals();
            this.resolvedKeyFieldsMap = processEdge.getResolvedKeyFields();
            this.resolvedSortFieldsMap = processEdge.getResolvedSortFields();
            this.resolvedValueFieldsMap = processEdge.getResolvedValueFields();
        }

        public FlowElement getFlowElement() {
            return this.flowElement;
        }

        public TezConfiguration getConfig() {
            return this.config;
        }

        public Set getOrdinals() {
            return this.ordinals;
        }

        public String getKeyClassName() {
            return this.keyClassName;
        }

        public String getValueClassName() {
            return this.valueClassName;
        }

        public String getKeyComparatorClassName() {
            return this.keyComparatorClassName;
        }

        public String getKeyPartitionerClassName() {
            return this.keyPartitionerClassName;
        }

        public String getOutputClassName() {
            return this.outputClassName;
        }

        public String getInputClassName() {
            return this.inputClassName;
        }

        public EdgeProperty.DataMovementType getMovementType() {
            return this.movementType;
        }

        public EdgeProperty.DataSourceType getSourceType() {
            return this.sourceType;
        }

        public EdgeProperty.SchedulingType getSchedulingType() {
            return this.schedulingType;
        }

        public Map<Integer, Fields> getResolvedKeyFieldsMap() {
            return this.resolvedKeyFieldsMap;
        }

        public Map<Integer, Fields> getResolvedSortFieldsMap() {
            return this.resolvedSortFieldsMap;
        }

        public Map<Integer, Fields> getResolvedValueFieldsMap() {
            return this.resolvedValueFieldsMap;
        }
    }
}

