/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.util;

import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.test.util.AbstractTestBase;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.util.LogUtils;
import junit.framework.Assert;
import org.apache.log4j.Level;

public abstract class FailingTestBase
extends RecordAPITestBase {
    public FailingTestBase() {
        LogUtils.initializeDefaultConsoleLogger((Level)Level.OFF);
    }

    protected abstract JobGraph getFailingJobGraph() throws Exception;

    protected String getFailingJarFilePath() {
        return null;
    }

    protected abstract int getTimeout();

    public void testJob() throws Exception {
        try {
            this.preSubmit();
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)("Pre-submit work caused an error: " + e.getMessage()));
        }
        SubmissionThread st = new SubmissionThread(Thread.currentThread(), this.executor, this.getFailingJobGraph(), this.getJobGraph());
        st.start();
        try {
            Thread.sleep(this.getTimeout() * 1000);
            Assert.fail((String)"Failing job and successful job did not fail.");
        }
        catch (InterruptedException ie) {
            // empty catch block
        }
        Exception cte = st.error;
        if (cte != null) {
            cte.printStackTrace();
            Assert.fail((String)("Task Canceling failed: " + cte.getMessage()));
        }
        try {
            this.postSubmit();
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)("Post-submit work caused an error: " + e.getMessage()));
        }
    }

    private class SubmissionThread
    extends Thread {
        private final Thread timeoutThread;
        private final NepheleMiniCluster executor;
        private final JobGraph failingJob;
        private final JobGraph job;
        private volatile Exception error;

        public SubmissionThread(Thread timeoutThread, NepheleMiniCluster executor, JobGraph failingJob, JobGraph job) {
            this.timeoutThread = timeoutThread;
            this.executor = executor;
            this.failingJob = failingJob;
            this.job = job;
        }

        @Override
        public void run() {
            JobClient client;
            try {
                client = this.executor.getJobClient(this.failingJob);
                client.setConsoleStreamForReporting(AbstractTestBase.getNullPrintStream());
                client.submitJobAndWait();
                this.error = new Exception("The job did not fail.");
            }
            catch (JobExecutionException jee) {
            }
            catch (Exception e) {
                this.error = e;
            }
            try {
                client = this.executor.getJobClient(this.job);
                client.setConsoleStreamForReporting(AbstractTestBase.getNullPrintStream());
                client.submitJobAndWait();
            }
            catch (Exception e) {
                this.error = e;
            }
            this.timeoutThread.interrupt();
        }
    }
}

