/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.tez;

import cascading.CascadingException;
import cascading.flow.FlowElement;
import cascading.flow.FlowElements;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.FlowSession;
import cascading.flow.SliceCounters;
import cascading.flow.StepCounters;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowNode;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.element.ElementDuct;
import cascading.flow.stream.element.InputSource;
import cascading.flow.tez.Hadoop2TezFlowProcess;
import cascading.flow.tez.stream.graph.Hadoop2TezStreamGraph;
import cascading.flow.tez.util.TezUtil;
import cascading.tap.Tap;
import cascading.util.LogUtil;
import cascading.util.Util;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowProcessor
extends AbstractLogicalIOProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(FlowProcessor.class);
    private TezConfiguration configuration;
    private Hadoop2TezFlowProcess currentProcess;
    private FlowNode flowNode;
    private Hadoop2TezStreamGraph streamGraph;

    public FlowProcessor(ProcessorContext context) {
        super(context);
    }

    public void initialize() throws Exception {
        this.configuration = new TezConfiguration(TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload()));
        TezUtil.setMRProperties(this.getContext(), (Configuration)this.configuration, true);
        try {
            HadoopUtil.initLog4j((Configuration)this.configuration);
            LOG.info("cascading version: {}", (Object)this.configuration.get("cascading.version", ""));
            this.currentProcess = new Hadoop2TezFlowProcess(new FlowSession(), this.getContext(), this.configuration);
            this.flowNode = (FlowNode)HadoopUtil.deserializeBase64((String)this.configuration.getRaw("cascading.flow.node"), (Configuration)this.configuration, BaseFlowNode.class);
            LOG.info("flow node id: {}, ordinal: {}", (Object)this.flowNode.getID(), (Object)this.flowNode.getOrdinal());
            LogUtil.logMemory((Logger)LOG, (String)("flow node id: " + this.flowNode.getID() + ", mem on start"));
        }
        catch (Throwable throwable) {
            if (throwable instanceof CascadingException) {
                throw (CascadingException)throwable;
            }
            throw new FlowException("internal error during processor configuration", throwable);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(Map<String, LogicalInput> inputMap, Map<String, LogicalOutput> outputMap) throws Exception {
        InputSource streamedHead;
        Collection allHeads;
        try {
            this.streamGraph = new Hadoop2TezStreamGraph(this.currentProcess, this.flowNode, inputMap, outputMap);
            allHeads = this.streamGraph.getHeads();
            streamedHead = this.streamGraph.getStreamedHead();
            for (Duct head : allHeads) {
                LOG.info("sourcing from: {} streamed: {}, id: {}", new Object[]{((ElementDuct)head).getFlowElement(), head == streamedHead, FlowElements.id((FlowElement)((ElementDuct)head).getFlowElement())});
            }
            for (Duct tail : this.streamGraph.getTails()) {
                LOG.info("sinking to: {}, id: {}", (Object)((ElementDuct)tail).getFlowElement(), (Object)FlowElements.id((FlowElement)((ElementDuct)tail).getFlowElement()));
            }
            for (Tap trap : this.flowNode.getTraps()) {
                LOG.info("trapping to: {}, id: {}", (Object)trap, (Object)FlowElements.id((FlowElement)trap));
            }
        }
        catch (Throwable throwable) {
            if (throwable instanceof CascadingException) {
                throw (CascadingException)throwable;
            }
            throw new FlowException("internal error during processor configuration", throwable);
        }
        this.streamGraph.prepare();
        this.waitForInputsReady(inputMap);
        long processBeginTime = System.currentTimeMillis();
        this.currentProcess.increment((Enum)SliceCounters.Process_Begin_Time, processBeginTime);
        this.currentProcess.increment((Enum)StepCounters.Process_Begin_Time, processBeginTime);
        Iterator iterator = allHeads.iterator();
        try {
            try {
                while (iterator.hasNext()) {
                    Duct next = (Duct)iterator.next();
                    if (next == streamedHead) continue;
                    ((InputSource)next).run(null);
                    LogUtil.logMemory((Logger)LOG, (String)("mem after accumulating source: " + ((ElementDuct)next).getFlowElement() + ", "));
                }
                streamedHead.run(null);
            }
            catch (IOException | OutOfMemoryError error) {
                throw error;
            }
            catch (Throwable throwable) {
                if (throwable instanceof CascadingException) {
                    throw (CascadingException)throwable;
                }
                throw new FlowException("internal error during processor execution on node: " + this.flowNode.getOrdinal(), throwable);
            }
        }
        finally {
            try {
                this.streamGraph.cleanup();
            }
            finally {
                long processEndTime = System.currentTimeMillis();
                this.currentProcess.increment((Enum)SliceCounters.Process_End_Time, processEndTime);
                this.currentProcess.increment((Enum)SliceCounters.Process_Duration, processEndTime - processBeginTime);
                this.currentProcess.increment((Enum)StepCounters.Process_End_Time, processEndTime);
                this.currentProcess.increment((Enum)StepCounters.Process_Duration, processEndTime - processBeginTime);
            }
        }
    }

    protected void waitForInputsReady(Map<String, LogicalInput> inputMap) throws InterruptedException {
        long beginInputReady = System.currentTimeMillis();
        HashSet<LogicalInput> inputs = new HashSet<LogicalInput>(inputMap.values());
        this.getContext().waitForAllInputsReady(inputs);
        LOG.info("flow node id: {}, all {} inputs ready in: {}", new Object[]{this.flowNode.getID(), inputs.size(), Util.formatDurationHMSms((long)(System.currentTimeMillis() - beginInputReady))});
    }

    public void handleEvents(List<Event> events) {
        LOG.debug("in events");
    }

    public void close() throws Exception {
        String message = "flow node id: " + this.flowNode.getID();
        LogUtil.logMemory((Logger)LOG, (String)(message + ", mem on close"));
        LogUtil.logCounters((Logger)LOG, (String)(message + ", counter:"), (FlowProcess)this.currentProcess);
    }
}

