/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.tez.planner;

import cascading.CascadingException;
import cascading.flow.FlowStep;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.tez.Hadoop2TezFlow;
import cascading.flow.tez.Hadoop2TezFlowStep;
import cascading.management.state.ClientState;
import cascading.stats.FlowNodeStats;
import cascading.stats.FlowStepStats;
import cascading.stats.tez.TezStepStats;
import cascading.stats.tez.util.TezStatsUtil;
import cascading.util.Util;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;

public class Hadoop2TezFlowStepJob
extends FlowStepJob<TezConfiguration> {
    private static final Set<StatusGetOpts> STATUS_GET_OPTS = EnumSet.of(StatusGetOpts.GET_COUNTERS);
    private DAG dag;
    private TezClient tezClient;
    private DAGClient dagClient;
    private String dagId;

    private static long getStoreInterval(Configuration configuration) {
        return configuration.getLong("cascading.stats.store.interval", 60000L);
    }

    private static long getChildDetailsBlockingDuration(Configuration configuration) {
        return configuration.getLong("cascading.stats.complete_child_details.block.duration", 60000L);
    }

    public static long getJobPollingInterval(Configuration configuration) {
        return configuration.getLong("cascading.flow.job.pollinginterval", 5000L);
    }

    public Hadoop2TezFlowStepJob(ClientState clientState, BaseFlowStep<TezConfiguration> flowStep, TezConfiguration currentConf, DAG dag) {
        super(clientState, (Object)currentConf, flowStep, Hadoop2TezFlowStepJob.getJobPollingInterval((Configuration)currentConf), Hadoop2TezFlowStepJob.getStoreInterval((Configuration)currentConf), Hadoop2TezFlowStepJob.getChildDetailsBlockingDuration((Configuration)currentConf));
        this.dag = dag;
        if (flowStep.isDebugEnabled()) {
            flowStep.logDebug("using polling interval: " + this.pollingInterval, new Object[0]);
        }
    }

    protected FlowStepStats createStepStats(ClientState clientState) {
        return new TezStepStats((FlowStep)this.flowStep, clientState){
            DAGClient timelineClient;
            {
                this.timelineClient = null;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public DAGClient getJobStatusClient() {
                if (this.timelineClient != null) {
                    return this.timelineClient;
                }
                1 var1_1 = this;
                synchronized (var1_1) {
                    if (Hadoop2TezFlowStepJob.this.isTimelineServiceEnabled((TezConfiguration)Hadoop2TezFlowStepJob.this.jobConfiguration)) {
                        this.timelineClient = TezStatsUtil.createTimelineClient((DAGClient)Hadoop2TezFlowStepJob.this.dagClient);
                    }
                    if (this.timelineClient == null) {
                        this.timelineClient = Hadoop2TezFlowStepJob.this.dagClient;
                    }
                    return this.timelineClient;
                }
            }

            public String getProcessStatusURL() {
                return TezStatsUtil.getTrackingURL((TezClient)Hadoop2TezFlowStepJob.this.tezClient, (DAGClient)Hadoop2TezFlowStepJob.this.dagClient);
            }

            public String getProcessStepID() {
                return Hadoop2TezFlowStepJob.this.dagId;
            }
        };
    }

    protected void internalNonBlockingStart() throws IOException {
        try {
            if (!this.isTimelineServiceEnabled((TezConfiguration)this.jobConfiguration)) {
                this.flowStep.logWarn("'yarn.timeline-service.enabled' is disabled, please enable to capture detailed metrics of completed flows, this may require starting the YARN timeline server daemon");
            }
            TezConfiguration workingConf = new TezConfiguration((Configuration)this.jobConfiguration);
            this.flowStep.logInfo("tez session mode enabled: " + workingConf.getBoolean("tez.am.mode.session", false), new Object[0]);
            this.prepareEnsureStagingDir(workingConf);
            this.tezClient = TezClient.create((String)this.flowStep.getName(), (TezConfiguration)workingConf, ((Hadoop2TezFlowStep)this.flowStep).getAllLocalResources(), null);
            this.tezClient.start();
            this.dagClient = this.tezClient.submitDAG(this.dag);
            this.dagId = (String)Util.returnInstanceFieldIfExistsSafe((Object)this.dagClient, (String)"dagId");
            this.flowStep.logInfo("submitted tez dag to app master: {}, with dag id: {}", new Object[]{this.tezClient.getAppMasterApplicationId(), this.dagId});
        }
        catch (TezException exception) {
            this.throwable = exception;
            throw new CascadingException((Throwable)exception);
        }
    }

    private boolean isTimelineServiceEnabled(TezConfiguration workingConf) {
        return workingConf.getBoolean("yarn.timeline-service.enabled", false);
    }

    protected void updateNodeStatus(FlowNodeStats flowNodeStats) {
        if (this.dagClient == null) {
            return;
        }
        try {
            VertexStatus vertexStatus = this.dagClient.getVertexStatus(flowNodeStats.getID(), null);
            if (vertexStatus == null) {
                return;
            }
            VertexStatus.State state = vertexStatus.getState();
            if (state == null) {
                return;
            }
            List diagnostics = null;
            switch (state) {
                case NEW: {
                    break;
                }
                case INITIALIZING: {
                    break;
                }
                case INITED: {
                    break;
                }
                case RUNNING: {
                    flowNodeStats.markRunning();
                    break;
                }
                case SUCCEEDED: {
                    if (!flowNodeStats.isRunning()) {
                        flowNodeStats.markRunning();
                    }
                    flowNodeStats.markSuccessful();
                    break;
                }
                case FAILED: {
                    if (!flowNodeStats.isRunning()) {
                        flowNodeStats.markRunning();
                    }
                    if ((diagnostics = vertexStatus.getDiagnostics()) == null || diagnostics.isEmpty()) {
                        flowNodeStats.markFailed(this.throwable);
                        break;
                    }
                    flowNodeStats.markFailed(diagnostics.toArray(new String[diagnostics.size()]));
                    break;
                }
                case KILLED: {
                    if (!flowNodeStats.isRunning()) {
                        flowNodeStats.markRunning();
                    }
                    flowNodeStats.markStopped();
                    break;
                }
                case ERROR: {
                    if (!flowNodeStats.isRunning()) {
                        flowNodeStats.markRunning();
                    }
                    if ((diagnostics = vertexStatus.getDiagnostics()) == null || diagnostics.isEmpty()) {
                        flowNodeStats.markFailed(this.throwable);
                        break;
                    }
                    flowNodeStats.markFailed(diagnostics.toArray(new String[diagnostics.size()]));
                    break;
                }
            }
        }
        catch (IOException | TezException exception) {
            this.flowStep.logError("failed setting node status", this.throwable);
        }
    }

    private Path prepareEnsureStagingDir(TezConfiguration workingConf) throws IOException {
        String stepStagingPath = this.createStepStagingPath();
        workingConf.set("tez.staging-dir", stepStagingPath);
        Path stagingDir = new Path(stepStagingPath);
        FileSystem fileSystem = FileSystem.get((Configuration)workingConf);
        stagingDir = fileSystem.makeQualified(stagingDir);
        TokenCache.obtainTokensForNamenodes((Credentials)new Credentials(), (Path[])new Path[]{stagingDir}, (Configuration)workingConf);
        TezClientUtils.ensureStagingDirExists((Configuration)workingConf, (Path)stagingDir);
        if (fileSystem.getScheme().startsWith("file:/")) {
            new File(stagingDir.toUri()).mkdirs();
        }
        return stagingDir;
    }

    String createStepStagingPath() {
        String result = "";
        if (HadoopUtil.isLocal((Configuration)((Configuration)this.jobConfiguration))) {
            result = ((TezConfiguration)this.jobConfiguration).get("hadoop.tmp.dir") + "/";
        }
        String flowStagingPath = ((Hadoop2TezFlow)this.flowStep.getFlow()).getFlowStagingPath();
        return result + flowStagingPath + "/" + this.flowStep.getID();
    }

    private DAGStatus.State getDagStatusState() {
        DAGStatus dagStatus = this.getDagStatus();
        if (dagStatus == null) {
            this.flowStep.logWarn("getDagStatus returned null");
            return null;
        }
        DAGStatus.State state = dagStatus.getState();
        if (state == null) {
            this.flowStep.logWarn("dagStatus#getState returned null");
        }
        return state;
    }

    private boolean isDagStatusComplete() {
        DAGStatus dagStatus = this.getDagStatus();
        if (dagStatus == null) {
            this.flowStep.logWarn("getDagStatus returned null");
        }
        return dagStatus != null && dagStatus.isCompleted();
    }

    private DAGStatus getDagStatus() {
        if (this.dagClient == null) {
            return null;
        }
        try {
            return this.dagClient.getDAGStatus(null);
        }
        catch (NullPointerException exception) {
            this.flowStep.logWarn("NPE thrown by getDAGStatus, known issue");
            return null;
        }
        catch (IOException | TezException exception) {
            throw new CascadingException(exception);
        }
    }

    private DAGStatus getDagStatusWithCounters() {
        if (this.dagClient == null) {
            return null;
        }
        try {
            return this.dagClient.getDAGStatus(STATUS_GET_OPTS);
        }
        catch (IOException | TezException exception) {
            throw new CascadingException("unable to get counters from dag client", exception);
        }
    }

    protected void internalBlockOnStop() throws IOException {
        if (this.isDagStatusComplete()) {
            return;
        }
        try {
            if (this.dagClient != null) {
                this.dagClient.tryKillDAG();
            }
        }
        catch (Exception exception) {
            this.flowStep.logWarn("exception during attempt to kill dag", (Throwable)exception);
        }
        this.stopDAGClient();
        this.stopTezClient();
    }

    protected void internalCleanup() {
        this.stopDAGClient();
        this.stopTezClient();
    }

    private void stopDAGClient() {
        try {
            if (this.dagClient != null) {
                this.dagClient.close();
            }
        }
        catch (Exception exception) {
            this.flowStep.logWarn("exception during attempt to cleanup client", (Throwable)exception);
        }
    }

    private void stopTezClient() {
        try {
            if (this.tezClient == null) {
                return;
            }
            if (this.isRemoteExecution()) {
                this.tezClient.stop();
                return;
            }
            Boolean result = Util.submitWithTimeout((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    Hadoop2TezFlowStepJob.this.tezClient.stop();
                    return true;
                }
            }, (int)5, (TimeUnit)TimeUnit.MINUTES);
            if (result == null || !result.booleanValue()) {
                this.flowStep.logWarn("tezClient#stop() timed out after 5 minutes, cancelling call, continuing");
            }
        }
        catch (Exception exception) {
            this.flowStep.logWarn("exception during attempt to cleanup client", (Throwable)exception);
        }
    }

    protected boolean internalNonBlockingIsSuccessful() throws IOException {
        return this.isDagStatusComplete() && this.getDagStatusState() == DAGStatus.State.SUCCEEDED;
    }

    protected boolean isRemoteExecution() {
        return !HadoopUtil.isLocal((Configuration)((Configuration)this.jobConfiguration));
    }

    protected Throwable getThrowable() {
        return this.throwable;
    }

    protected String internalJobId() {
        return this.dagClient.getExecutionContext();
    }

    protected boolean internalNonBlockingIsComplete() throws IOException {
        return this.isDagStatusComplete();
    }

    protected void dumpDebugInfo() {
        DAGStatus dagStatus = this.getDagStatus();
        if (dagStatus == null) {
            return;
        }
        this.flowStep.logWarn("Tez DAG " + this.dagId + " state at " + dagStatus.getState());
        this.flowStep.logWarn("failure info: ");
        for (String diagLine : dagStatus.getDiagnostics()) {
            this.flowStep.logWarn(diagLine);
        }
    }

    protected boolean internalIsStartedRunning() {
        return this.getDagStatusState() == DAGStatus.State.RUNNING || this.isDagStatusComplete();
    }
}

