/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.CascadingException;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.FlowSession;
import cascading.flow.Flows;
import cascading.flow.SliceCounters;
import cascading.flow.StepCounters;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.hadoop.planner.HadoopFlowStepJob;
import cascading.flow.hadoop.stream.graph.HadoopMapStreamGraph;
import cascading.flow.hadoop.util.HadoopMRUtil;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowNode;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.element.ElementDuct;
import cascading.flow.stream.element.SourceStage;
import cascading.tap.Tap;
import cascading.util.LogUtil;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowMapper
implements MapRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(FlowMapper.class);
    private FlowNode flowNode;
    private HadoopMapStreamGraph streamGraph;
    private HadoopFlowProcess currentProcess;

    public void configure(JobConf jobConf) {
        try {
            HadoopUtil.initLog4j((JobConf)jobConf);
            LOG.info("cascading version: {}", (Object)jobConf.get("cascading.version", ""));
            LOG.info("child jvm opts: {}", (Object)jobConf.get("mapred.child.java.opts", ""));
            this.currentProcess = new HadoopFlowProcess(new FlowSession(), jobConf, true);
            String mapNodeState = jobConf.getRaw("cascading.flow.step.node.map");
            if (mapNodeState == null) {
                mapNodeState = HadoopMRUtil.readStateFromDistCache(jobConf, jobConf.get("cascading.flow.step.id"), "map");
            }
            this.flowNode = (FlowNode)HadoopUtil.deserializeBase64((String)mapNodeState, (Configuration)jobConf, BaseFlowNode.class);
            LOG.info("flow node id: {}, ordinal: {}", (Object)this.flowNode.getID(), (Object)this.flowNode.getOrdinal());
            Tap source = Flows.getTapForID((Set)this.flowNode.getSourceTaps(), (String)jobConf.get("cascading.step.source"));
            this.streamGraph = new HadoopMapStreamGraph(this.currentProcess, this.flowNode, source);
            for (Duct head : this.streamGraph.getHeads()) {
                LOG.info("sourcing from: " + ((ElementDuct)head).getFlowElement());
            }
            for (Duct tail : this.streamGraph.getTails()) {
                LOG.info("sinking to: " + ((ElementDuct)tail).getFlowElement());
            }
            for (Tap trap : this.flowNode.getTraps()) {
                LOG.info("trapping to: " + trap);
            }
            LogUtil.logMemory((Logger)LOG, (String)("flow node id: " + this.flowNode.getID() + ", mem on start"));
        }
        catch (Throwable throwable) {
            this.reportIfLocal(throwable);
            if (throwable instanceof CascadingException) {
                throw (CascadingException)throwable;
            }
            throw new FlowException("internal error during mapper configuration", throwable);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException {
        this.currentProcess.setReporter(reporter);
        this.currentProcess.setOutputCollector(output);
        this.streamGraph.prepare();
        long processBeginTime = System.currentTimeMillis();
        this.currentProcess.increment((Enum)SliceCounters.Process_Begin_Time, processBeginTime);
        this.currentProcess.increment((Enum)StepCounters.Process_Begin_Time, processBeginTime);
        SourceStage streamedHead = this.streamGraph.getStreamedHead();
        Iterator iterator = this.streamGraph.getHeads().iterator();
        try {
            try {
                while (iterator.hasNext()) {
                    Duct next = (Duct)iterator.next();
                    if (next == streamedHead) continue;
                    ((SourceStage)next).run(null);
                }
                streamedHead.run((Object)input);
            }
            catch (OutOfMemoryError error) {
                throw error;
            }
            catch (IOException exception) {
                this.reportIfLocal(exception);
                throw exception;
            }
            catch (Throwable throwable) {
                this.reportIfLocal(throwable);
                if (throwable instanceof CascadingException) {
                    throw (CascadingException)throwable;
                }
                throw new FlowException("internal error during mapper execution", throwable);
            }
        }
        finally {
            try {
                this.streamGraph.cleanup();
            }
            finally {
                long processEndTime = System.currentTimeMillis();
                this.currentProcess.increment((Enum)SliceCounters.Process_End_Time, processEndTime);
                this.currentProcess.increment((Enum)SliceCounters.Process_Duration, processEndTime - processBeginTime);
                this.currentProcess.increment((Enum)StepCounters.Process_End_Time, processEndTime);
                this.currentProcess.increment((Enum)StepCounters.Process_Duration, processEndTime - processBeginTime);
                String message = "flow node id: " + this.flowNode.getID();
                LogUtil.logMemory((Logger)LOG, (String)(message + ", mem on close"));
                LogUtil.logCounters((Logger)LOG, (String)(message + ", counter:"), (FlowProcess)this.currentProcess);
            }
        }
    }

    private void reportIfLocal(Throwable throwable) {
        if (HadoopUtil.isLocal((Configuration)this.currentProcess.getJobConf())) {
            HadoopFlowStepJob.reportLocalError(throwable);
        }
    }
}

