/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.cancelling;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.nephele.client.AbstractJobResult;
import eu.stratosphere.nephele.client.JobCancelResult;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobProgressResult;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.JobEvent;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobStatus;
import eu.stratosphere.util.LogUtils;
import eu.stratosphere.util.StringUtils;
import java.util.Iterator;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;

public abstract class CancellingTestBase {
    private static final Log LOG = LogFactory.getLog(CancellingTestBase.class);
    private static final int MINIMUM_HEAP_SIZE_MB = 192;
    private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10000;
    protected NepheleMiniCluster executor;

    private void verifyJvmOptions() {
        long heap = Runtime.getRuntime().maxMemory() >> 20;
        Assert.assertTrue((String)("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + 192 + "m"), (heap > 142L ? 1 : 0) != 0);
    }

    @BeforeClass
    public static void initLogging() {
        LogUtils.initializeDefaultConsoleLogger((Level)Level.ERROR);
    }

    @Before
    public void startCluster() throws Exception {
        this.verifyJvmOptions();
        this.executor = new NepheleMiniCluster();
        this.executor.setDefaultOverwriteFiles(true);
        this.executor.start();
    }

    @After
    public void stopCluster() throws Exception {
        if (this.executor != null) {
            this.executor.stop();
            this.executor = null;
            FileSystem.closeAll();
            System.gc();
        }
    }

    public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
        this.runAndCancelJob(plan, msecsTillCanceling, 10000);
    }

    public void runAndCancelJob(Plan plan, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
        try {
            JobGraph jobGraph = this.getJobGraph(plan);
            long startingTime = System.currentTimeMillis();
            long cancelTime = -1L;
            JobClient client = this.executor.getJobClient(jobGraph);
            JobSubmissionResult submissionResult = client.submitJob();
            if (submissionResult.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
                throw new IllegalStateException(submissionResult.getDescription());
            }
            int interval = client.getRecommendedPollingInterval();
            long sleep = (long)interval * 1000L;
            Thread.sleep(sleep / 2L);
            long lastProcessedEventSequenceNumber = -1L;
            while (true) {
                JobProgressResult jobProgressResult;
                if (Thread.interrupted()) {
                    throw new IllegalStateException("Job client has been interrupted");
                }
                long now = System.currentTimeMillis();
                if (cancelTime < 0L) {
                    if (startingTime + (long)msecsTillCanceling < now) {
                        LOG.info((Object)"Issuing cancel request");
                        JobCancelResult jcr = client.cancelJob();
                        if (jcr == null) {
                            throw new IllegalStateException("Return value of cancelJob is null!");
                        }
                        if (jcr.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
                            throw new IllegalStateException(jcr.getDescription());
                        }
                        cancelTime = now;
                    }
                } else if (cancelTime + (long)maxTimeTillCanceled < now) {
                    throw new IllegalStateException("Cancelling of job took " + (now - cancelTime) + " milliseconds, only " + maxTimeTillCanceled + " milliseconds are allowed");
                }
                if ((jobProgressResult = client.getJobProgress()) == null) {
                    throw new IllegalStateException("Returned job progress is unexpectedly null!");
                }
                if (jobProgressResult.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
                    throw new IllegalStateException("Could not retrieve job progress: " + jobProgressResult.getDescription());
                }
                boolean exitLoop = false;
                Iterator it = jobProgressResult.getEvents();
                while (it.hasNext()) {
                    AbstractEvent event = (AbstractEvent)it.next();
                    if (lastProcessedEventSequenceNumber >= event.getSequenceNumber()) continue;
                    lastProcessedEventSequenceNumber = event.getSequenceNumber();
                    if (event instanceof JobEvent) {
                        JobEvent jobEvent = (JobEvent)event;
                        JobStatus jobStatus = jobEvent.getCurrentJobStatus();
                        switch (jobStatus) {
                            case FINISHED: {
                                throw new IllegalStateException("Job finished successfully");
                            }
                            case FAILED: {
                                throw new IllegalStateException("Job failed");
                            }
                            case CANCELED: {
                                exitLoop = true;
                                break;
                            }
                            case SCHEDULED: 
                            case RUNNING: {
                                break;
                            }
                            default: {
                                throw new Exception("Bug: Unrecognized Job Status.");
                            }
                        }
                    }
                    if (!exitLoop) continue;
                    break;
                }
                if (!exitLoop) {
                    Thread.sleep(sleep);
                    continue;
                }
                break;
            }
        }
        catch (Exception e) {
            LOG.error((Object)e);
            Assert.fail((String)StringUtils.stringifyException((Throwable)e));
            return;
        }
    }

    private JobGraph getJobGraph(Plan plan) throws Exception {
        PactCompiler pc = new PactCompiler(new DataStatistics());
        OptimizedPlan op = pc.compile(plan);
        NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
        return jgg.compileJobGraph(op);
    }
}

