/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.CascadingException;
import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.ConfigurationSetter;
import cascading.flow.hadoop.FlowMapper;
import cascading.flow.hadoop.FlowReducer;
import cascading.flow.hadoop.planner.HadoopFlowStepJob;
import cascading.flow.hadoop.util.HadoopMRUtil;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.planner.PlatformInfo;
import cascading.flow.planner.graph.ElementGraph;
import cascading.flow.planner.process.FlowNodeGraph;
import cascading.flow.planner.process.ProcessEdge;
import cascading.management.state.ClientState;
import cascading.pipe.CoGroup;
import cascading.property.ConfigDef;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.MultiInputFormat;
import cascading.tap.hadoop.util.Hadoop18TapUtil;
import cascading.tap.hadoop.util.TempHfs;
import cascading.tuple.Fields;
import cascading.tuple.hadoop.TupleSerialization;
import cascading.tuple.hadoop.util.CoGroupingComparator;
import cascading.tuple.hadoop.util.CoGroupingPartitioner;
import cascading.tuple.hadoop.util.GroupingComparator;
import cascading.tuple.hadoop.util.GroupingPartitioner;
import cascading.tuple.hadoop.util.GroupingSortingComparator;
import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
import cascading.tuple.hadoop.util.ReverseTupleComparator;
import cascading.tuple.hadoop.util.TupleComparator;
import cascading.tuple.io.KeyIndexTuple;
import cascading.tuple.io.KeyTuple;
import cascading.tuple.io.TuplePair;
import cascading.tuple.io.ValueIndexTuple;
import cascading.tuple.io.ValueTuple;
import cascading.util.ProcessLogger;
import cascading.util.Util;
import cascading.util.Version;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;

public class HadoopFlowStep
extends BaseFlowStep<JobConf> {
    protected HadoopFlowStep() {
    }

    protected HadoopFlowStep(String name, int ordinal) {
        super(name, ordinal);
    }

    public HadoopFlowStep(ElementGraph elementGraph, FlowNodeGraph flowNodeGraph) {
        super(elementGraph, flowNodeGraph);
    }

    public Map<Object, Object> getConfigAsProperties() {
        return HadoopUtil.createProperties((Configuration)((Configuration)this.getConfig()));
    }

    public JobConf createInitializedConfig(FlowProcess<JobConf> flowProcess, JobConf parentConfig) {
        FlowNode reducerNode;
        String versionString;
        JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf((JobConf)parentConfig);
        conf.setBoolean("mapred.used.genericoptionsparser", true);
        conf.setJobName(this.getStepDisplayName(conf.getInt("cascading.display.id.truncate", Util.ID_LENGTH)));
        conf.setOutputKeyClass(KeyTuple.class);
        conf.setOutputValueClass(ValueTuple.class);
        conf.setMapRunnerClass(FlowMapper.class);
        conf.setReducerClass(FlowReducer.class);
        Set serializations = this.getFieldDeclaredSerializations(Serialization.class);
        TupleSerialization.setSerializations((Configuration)conf, (Collection)serializations);
        this.initFromSources(flowProcess, conf);
        this.initFromSink(flowProcess, conf);
        this.initFromTraps(flowProcess, conf);
        this.initFromStepConfigDef(conf);
        int numSinkParts = this.getSink().getScheme().getNumSinkParts();
        if (numSinkParts != 0) {
            if (this.getGroup() != null) {
                conf.setNumReduceTasks(numSinkParts);
            } else {
                conf.setNumMapTasks(numSinkParts);
            }
        } else if (this.getGroup() != null) {
            int gatherPartitions = conf.getNumReduceTasks();
            if (gatherPartitions == 0) {
                gatherPartitions = conf.getInt("cascading.flow.runtime.gather.partitions.num", 0);
            }
            if (gatherPartitions == 0) {
                throw new FlowException(this.getName(), "a default number of gather partitions must be set, see FlowRuntimeProps");
            }
            conf.setNumReduceTasks(gatherPartitions);
        }
        conf.setOutputKeyComparatorClass(TupleComparator.class);
        ProcessEdge processEdge = (ProcessEdge)Util.getFirst((Collection)this.getFlowNodeGraph().edgeSet());
        if (this.getGroup() == null) {
            conf.setNumReduceTasks(0);
        } else {
            conf.setMapOutputKeyClass(KeyTuple.class);
            conf.setMapOutputValueClass(ValueTuple.class);
            conf.setPartitionerClass(GroupingPartitioner.class);
            if (this.getGroup().isSortReversed()) {
                conf.setOutputKeyComparatorClass(ReverseTupleComparator.class);
            }
            Integer ordinal = (Integer)Util.getFirst((Collection)processEdge.getSinkExpectedOrdinals());
            HadoopUtil.addComparators((Configuration)conf, (String)"cascading.group.comparator", (Map)this.getGroup().getKeySelectors(), (Fields)((Fields)processEdge.getResolvedKeyFields().get(ordinal)));
            if (this.getGroup().isGroupBy()) {
                HadoopUtil.addComparators((Configuration)conf, (String)"cascading.sort.comparator", (Map)this.getGroup().getSortingSelectors(), (Fields)((Fields)processEdge.getResolvedSortFields().get(ordinal)));
            }
            if (!this.getGroup().isGroupBy()) {
                conf.setPartitionerClass(CoGroupingPartitioner.class);
                conf.setMapOutputKeyClass(KeyIndexTuple.class);
                conf.setMapOutputValueClass(ValueIndexTuple.class);
                conf.setOutputKeyComparatorClass(IndexTupleCoGroupingComparator.class);
                conf.setOutputValueGroupingComparator(CoGroupingComparator.class);
            }
            if (this.getGroup().isSorted()) {
                conf.setPartitionerClass(GroupingSortingPartitioner.class);
                conf.setMapOutputKeyClass(TuplePair.class);
                if (this.getGroup().isSortReversed()) {
                    conf.setOutputKeyComparatorClass(ReverseGroupingSortingComparator.class);
                } else {
                    conf.setOutputKeyComparatorClass(GroupingSortingComparator.class);
                }
                conf.setOutputValueGroupingComparator(GroupingComparator.class);
            }
        }
        if (processEdge != null && HadoopFlowStep.ifCoGroupAndKeysHaveCommonTypes((ProcessLogger)this, processEdge.getFlowElement(), processEdge.getResolvedKeyFields())) {
            conf.set("cascading.node.ordinals", Util.join((Collection)processEdge.getSinkExpectedOrdinals(), (String)","));
            HadoopUtil.addFields((Configuration)conf, (String)"cascading.node.key.fields", (Map)processEdge.getResolvedKeyFields());
            HadoopUtil.addFields((Configuration)conf, (String)"cascading.node.sort.fields", (Map)processEdge.getResolvedSortFields());
            HadoopUtil.addFields((Configuration)conf, (String)"cascading.node.value.fields", (Map)processEdge.getResolvedValueFields());
        }
        if ((versionString = Version.getRelease()) != null) {
            conf.set("cascading.version", versionString);
        }
        conf.set("cascading.flow.step.id", this.getID());
        conf.set("cascading.flow.step.num", Integer.toString(this.getOrdinal()));
        HadoopUtil.setIsInflow((Configuration)conf);
        Iterator iterator = this.getFlowNodeGraph().getTopologicalIterator();
        FlowNode mapperNode = (FlowNode)iterator.next();
        FlowNode flowNode = reducerNode = iterator.hasNext() ? (FlowNode)iterator.next() : null;
        if (reducerNode != null) {
            reducerNode.addProcessAnnotation("cascading.flow.runtime.gather.partitions.num", Integer.toString(conf.getNumReduceTasks()));
        }
        String mapState = HadoopUtil.pack((Object)mapperNode, (Configuration)conf);
        String reduceState = HadoopUtil.pack((Object)reducerNode, (Configuration)conf);
        int maxSize = Short.MAX_VALUE;
        int length = mapState.length() + reduceState.length();
        if (this.isHadoopLocalMode(conf) || length < maxSize) {
            conf.set("cascading.flow.step.node.map", mapState);
            if (!Util.isEmpty((String)reduceState)) {
                conf.set("cascading.flow.step.node.reduce", reduceState);
            }
        } else {
            conf.set("cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache(conf, this.getID(), "map", mapState));
            if (!Util.isEmpty((String)reduceState)) {
                conf.set("cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache(conf, this.getID(), "reduce", reduceState));
            }
        }
        return conf;
    }

    private static boolean ifCoGroupAndKeysHaveCommonTypes(ProcessLogger processLogger, FlowElement flowElement, Map<Integer, Fields> resolvedKeyFields) {
        if (!(flowElement instanceof CoGroup)) {
            return true;
        }
        if (resolvedKeyFields == null || resolvedKeyFields.size() < 2) {
            return true;
        }
        Iterator<Map.Entry<Integer, Fields>> iterator = resolvedKeyFields.entrySet().iterator();
        Fields fields = iterator.next().getValue();
        while (iterator.hasNext()) {
            Fields next = iterator.next().getValue();
            if (Arrays.equals(fields.getTypesClasses(), next.getTypesClasses())) continue;
            processLogger.logWarn("unable to perform: {}, on mismatched join types and optimize serialization with type exclusion, fields: {} & {}", new Object[]{flowElement, fields, next});
            return false;
        }
        return true;
    }

    public boolean isHadoopLocalMode(JobConf conf) {
        return HadoopUtil.isLocal((Configuration)conf);
    }

    protected FlowStepJob<JobConf> createFlowStepJob(ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf initializedStepConfig) {
        try {
            return new HadoopFlowStepJob(clientState, this, initializedStepConfig);
        }
        catch (NoClassDefFoundError error) {
            PlatformInfo platformInfo = HadoopUtil.getPlatformInfo(JobConf.class, (String)"org/apache/hadoop", (String)"Hadoop MR");
            String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1";
            this.logError(String.format(message, platformInfo.toString()), error);
            throw error;
        }
    }

    public void clean(JobConf config) {
        String stepStatePath = config.get("cascading.flow.step.path");
        if (stepStatePath != null) {
            try {
                HadoopUtil.removeStateFromDistCache((Configuration)config, (String)stepStatePath);
            }
            catch (IOException exception) {
                this.logWarn("unable to remove step state file: " + stepStatePath, exception);
            }
        }
        if (this.tempSink != null) {
            try {
                this.tempSink.deleteResource((Object)config);
            }
            catch (Exception exception) {
                this.logWarn("unable to remove temporary file: " + this.tempSink, exception);
            }
        }
        for (Tap sink : this.getSinkTaps()) {
            this.cleanIntermediateData(config, sink);
        }
        for (Tap tap : this.getTraps()) {
            this.cleanTapMetaData(config, tap);
        }
    }

    protected void cleanIntermediateData(JobConf config, Tap sink) {
        if (sink.isTemporary() && (this.getFlow().getFlowStats().isSuccessful() || this.getFlow().getRunID() == null)) {
            try {
                sink.deleteResource((Object)config);
            }
            catch (Exception exception) {
                this.logWarn("unable to remove temporary file: " + sink, exception);
            }
        } else {
            this.cleanTapMetaData(config, sink);
        }
    }

    private void cleanTapMetaData(JobConf jobConf, Tap tap) {
        try {
            Hadoop18TapUtil.cleanupTapMetaData((Configuration)jobConf, (Tap)tap);
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private void initFromTraps(FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps) {
        if (!traps.isEmpty()) {
            JobConf trapConf = HadoopUtil.copyJobConf((JobConf)conf);
            for (Tap tap : traps.values()) {
                tap.sinkConfInit(flowProcess, (Object)trapConf);
            }
        }
    }

    protected void initFromSources(FlowProcess<JobConf> flowProcess, JobConf conf) {
        Set<Tap> uniqueSources = this.getUniqueStreamedSources();
        JobConf[] streamedJobs = new JobConf[uniqueSources.size()];
        int i = 0;
        for (Tap tap : uniqueSources) {
            if (tap.getIdentifier() == null) {
                throw new IllegalStateException("tap may not have null identifier: " + tap.toString());
            }
            streamedJobs[i] = (JobConf)flowProcess.copyConfig((Object)conf);
            streamedJobs[i].set("cascading.step.source", Tap.id((Tap)tap));
            tap.sourceConfInit(flowProcess, (Object)streamedJobs[i]);
            ++i;
        }
        Set accumulatedSources = this.getAllAccumulatedSources();
        for (Tap tap : accumulatedSources) {
            JobConf accumulatedJob = (JobConf)flowProcess.copyConfig((Object)conf);
            tap.sourceConfInit(flowProcess, (Object)accumulatedJob);
            Map map = flowProcess.diffConfigIntoMap((Object)conf, (Object)accumulatedJob);
            conf.set("cascading.node.accumulated.source.conf." + Tap.id((Tap)tap), HadoopUtil.pack((Object)map, (Configuration)conf));
            try {
                if (DistributedCache.getCacheFiles((Configuration)accumulatedJob) == null) continue;
                DistributedCache.setCacheFiles((URI[])DistributedCache.getCacheFiles((Configuration)accumulatedJob), (Configuration)conf);
            }
            catch (IOException exception) {
                throw new CascadingException((Throwable)exception);
            }
        }
        MultiInputFormat.addInputFormat((JobConf)conf, (JobConf[])streamedJobs);
    }

    private void initFromStepConfigDef(JobConf conf) {
        this.initConfFromStepConfigDef((ConfigDef.Setter)new ConfigurationSetter((Configuration)conf));
    }

    private Set<Tap> getUniqueStreamedSources() {
        Set allAccumulatedSources = this.getAllAccumulatedSources();
        allAccumulatedSources.removeAll(this.getAllStreamedSources());
        HashSet<Tap> set = new HashSet<Tap>(this.sources.keySet());
        set.removeAll(allAccumulatedSources);
        return set;
    }

    protected void initFromSink(FlowProcess<JobConf> flowProcess, JobConf conf) {
        Path outputPath;
        if (this.getSink() != null) {
            this.getSink().sinkConfInit(flowProcess, (Object)conf);
        }
        Class outputFormat = conf.getClass("mapred.output.format.class", null, OutputFormat.class);
        boolean isFileOutputFormat = false;
        if (outputFormat != null) {
            isFileOutputFormat = FileOutputFormat.class.isAssignableFrom(outputFormat);
        }
        if ((outputPath = FileOutputFormat.getOutputPath((JobConf)conf)) == null && (isFileOutputFormat || outputFormat == null)) {
            this.tempSink = new TempHfs((Configuration)conf, "tmp:/" + new Path(this.getSink().getIdentifier()).toUri().getPath(), true);
        }
        if (this.tempSink != null) {
            this.tempSink.sinkConfInit(flowProcess, (Object)conf);
        }
    }

    protected void initFromTraps(FlowProcess<JobConf> flowProcess, JobConf conf) {
        this.initFromTraps(flowProcess, conf, this.getTrapMap());
    }
}

