package org.apache.asterix.test.active;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.CountRetryPolicyFactory;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.InfiniteRetryPolicyFactory;
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.lock.MetadataLockManager;
import org.apache.asterix.om.functions.IFunctionExtensionManager;
import org.apache.asterix.runtime.functions.FunctionCollection;
import org.apache.asterix.runtime.functions.FunctionManager;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.active.TestEventsListener;
import org.apache.asterix.test.base.TestMethodTracer;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/asterix/test/active/ActiveEventsListenerTest.class */
public class ActiveEventsListenerTest {
    static TestClusterControllerActor clusterController;
    static TestNodeControllerActor[] nodeControllers;
    static TestUserActor[] users;
    static ActiveNotificationHandler handler;
    static Dataset firstDataset;
    static Dataset secondDataset;
    static List<Dataset> allDatasets;
    static TestEventsListener listener;
    static IClusterStateManager clusterStateManager;
    static CcApplicationContext appCtx;
    static IStatementExecutor statementExecutor;
    static IHyracksClientConnection hcc;
    static IFunctionExtensionManager functionExtensionManager;
    static MetadataProvider metadataProvider;
    static IStorageComponentProvider componentProvider;
    static JobIdFactory jobIdFactory;
    static AlgebricksAbsolutePartitionConstraint locations;
    static ExecutorService executor;

    @Rule
    public TestRule watcher = new TestMethodTracer();
    static String[] nodes = {"node1", "node2"};
    static String dataverseName = "Default";
    static String entityName = "entityName";
    static EntityId entityId = new EntityId("Feed", dataverseName, entityName);
    static IMetadataLockManager lockManager = new MetadataLockManager();

    @Before
    public void setUp() throws Exception {
        jobIdFactory = new JobIdFactory(CcId.valueOf(0));
        handler = new ActiveNotificationHandler();
        allDatasets = new ArrayList();
        firstDataset = new Dataset(dataverseName, "firstDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0);
        secondDataset = new Dataset(dataverseName, "secondDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0);
        allDatasets.add(firstDataset);
        allDatasets.add(secondDataset);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        executor = Executors.newCachedThreadPool(runnable -> {
            return new Thread(runnable, "ClusterControllerServiceExecutor[" + atomicInteger.getAndIncrement() + "]");
        });
        clusterStateManager = (IClusterStateManager) Mockito.mock(IClusterStateManager.class);
        Mockito.when(clusterStateManager.getState()).thenReturn(IClusterManagementWork.ClusterState.ACTIVE);
        ClusterControllerService clusterControllerService = (ClusterControllerService) Mockito.mock(ClusterControllerService.class);
        CCServiceContext cCServiceContext = (CCServiceContext) Mockito.mock(CCServiceContext.class);
        appCtx = (CcApplicationContext) Mockito.mock(CcApplicationContext.class);
        statementExecutor = (IStatementExecutor) Mockito.mock(IStatementExecutor.class);
        hcc = (IHyracksClientConnection) Mockito.mock(IHyracksClientConnection.class);
        Mockito.when(appCtx.getActiveNotificationHandler()).thenReturn(handler);
        Mockito.when(appCtx.getMetadataLockManager()).thenReturn(lockManager);
        Mockito.when(appCtx.getServiceContext()).thenReturn(cCServiceContext);
        Mockito.when(appCtx.getClusterStateManager()).thenReturn(clusterStateManager);
        Mockito.when(appCtx.getActiveProperties()).thenReturn(Mockito.mock(ActiveProperties.class));
        componentProvider = new StorageComponentProvider();
        Mockito.when(appCtx.getStorageComponentProvider()).thenReturn(componentProvider);
        Mockito.when(cCServiceContext.getControllerService()).thenReturn(clusterControllerService);
        Mockito.when(clusterControllerService.getExecutor()).thenReturn(executor);
        locations = new AlgebricksAbsolutePartitionConstraint(nodes);
        functionExtensionManager = (IFunctionExtensionManager) Mockito.mock(IFunctionExtensionManager.class);
        Mockito.when(functionExtensionManager.getFunctionManager()).thenReturn(new FunctionManager(FunctionCollection.createDefaultFunctionCollection()));
        Mockito.when(appCtx.getExtensionManager()).thenReturn(functionExtensionManager);
        metadataProvider = new MetadataProvider(appCtx, (Dataverse) null);
        clusterController = new TestClusterControllerActor("CC", handler, allDatasets);
        nodeControllers = new TestNodeControllerActor[2];
        nodeControllers[0] = new TestNodeControllerActor(nodes[0], clusterController);
        nodeControllers[1] = new TestNodeControllerActor(nodes[1], clusterController);
        listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, new ArrayList(allDatasets), statementExecutor, appCtx, hcc, locations, new InfiniteRetryPolicyFactory());
        users = new TestUserActor[3];
        users[0] = newUser("Till", appCtx);
        users[1] = newUser("Mike", appCtx);
        users[2] = newUser("Dmitry", appCtx);
    }

    TestUserActor newUser(String str, CcApplicationContext ccApplicationContext) {
        return new TestUserActor("User: " + str, new MetadataProvider(ccApplicationContext, (Dataverse) null), clusterController);
    }

    @After
    public void tearDown() throws Exception {
        executor.shutdownNow();
        executor.awaitTermination(5L, TimeUnit.SECONDS);
        handler.stop();
        for (TestUserActor testUserActor : users) {
            testUserActor.stop();
        }
        for (TestNodeControllerActor testNodeControllerActor : nodeControllers) {
            testNodeControllerActor.stop();
        }
        clusterController.stop();
    }

    @Test
    public void testStartWhenStartSucceed() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        startActivity.sync();
        assertSuccess(startActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testStartWhenStartFailsCompile() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        Action startActivity = users[0].startActivity(listener);
        startActivity.sync();
        assertFailure(startActivity, 0);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStartWhenStartFailsRuntime() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.FAIL_RUNTIME);
        Action startActivity = users[0].startActivity(listener);
        startActivity.sync();
        assertFailure(startActivity, 0);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStartWhenStartSucceedButTimesout() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.FAIL_START_TIMEOUT_OP_SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        startActivity.sync();
        assertSuccess(startActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testStartWhenStartStuckTimesout() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.FAIL_START_TIMEOUT_STUCK);
        Action startActivity = users[0].startActivity(listener);
        startActivity.sync();
        assertFailure(startActivity, 0);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStopWhenStopTimesout() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        startActivity.sync();
        assertSuccess(startActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.FAIL_STOP_TIMEOUT);
        Action stopActivity = users[0].stopActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStartWhenOneNodeFinishesBeforeOtherNodeStarts() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        ActionSubscriber actionSubscriber = new ActionSubscriber();
        nodeControllers[0].subscribe(actionSubscriber);
        ActionSubscriber actionSubscriber2 = new ActionSubscriber();
        actionSubscriber2.stop();
        nodeControllers[1].subscribe(actionSubscriber2);
        Action startActivity = users[0].startActivity(listener);
        RuntimeRegistration runtimeRegistration = (RuntimeRegistration) actionSubscriber.get(0);
        runtimeRegistration.sync();
        runtimeRegistration.deregister();
        actionSubscriber.get(1).sync();
        actionSubscriber2.resume();
        ((RuntimeRegistration) actionSubscriber2.get(0)).sync();
        startActivity.sync();
        assertSuccess(startActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        Action stopActivity = users[0].stopActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStopWhenStopSucceed() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action stopActivity = users[0].stopActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testDoubleStopWhenStopSucceed() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action stopActivity = users[0].stopActivity(listener);
        Action stopActivity2 = users[1].stopActivity(listener);
        stopActivity.sync();
        stopActivity2.sync();
        if (stopActivity.hasFailed()) {
            assertFailure(stopActivity, 3090);
            assertSuccess(stopActivity2);
        } else {
            assertSuccess(stopActivity);
            assertFailure(stopActivity2, 3090);
        }
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testDoubleStartWhenStartSucceed() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        Action startActivity2 = users[1].startActivity(listener);
        startActivity.sync();
        startActivity2.sync();
        if (startActivity.hasFailed()) {
            assertFailure(startActivity, 3089);
            assertSuccess(startActivity2);
        } else {
            assertSuccess(startActivity);
            assertFailure(startActivity2, 3089);
        }
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testStopAfterDoubleStartWhenStartSucceedAndStopSucceed() throws Exception {
        testDoubleStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action stopActivity = users[2].stopActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testSuspendFromStopped() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action suspendActivity = users[0].suspendActivity(listener);
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Action resumeActivity = users[0].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
    }

    @Test
    public void testStartWhileSuspend() throws Exception {
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action suspendActivity = users[0].suspendActivity(listener);
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Action startActivity = users[1].startActivity(listener);
        for (int i = 0; i < 100; i++) {
            Assert.assertFalse(startActivity.isDone());
        }
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Action resumeActivity = users[0].resumeActivity(listener);
        resumeActivity.sync();
        startActivity.sync();
        assertSuccess(resumeActivity);
        assertSuccess(startActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testSuspendFromRunning() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.SUSPENDED, listener.getState());
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testSuspendFromRunningAndStopFailThenResumeSucceeds() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.RUNNING_JOB_FAIL);
        Action suspendActivity = users[1].suspendActivity(listener);
        suspendActivity.sync();
        Assert.assertFalse(suspendActivity.hasFailed());
        Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState());
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testStopFromRunningAndJobFails() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.STEP_SUCCEED);
        Action stopActivity = users[1].stopActivity(listener);
        synchronized (listener) {
            listener.wait();
        }
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        Assert.assertNull(listener.getRecoveryTask());
        listener.allowStep();
        stopActivity.sync();
        Assert.assertFalse(stopActivity.hasFailed());
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Assert.assertNull(listener.getRecoveryTask());
    }

    @Test
    public void testRecovery() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        waitForStateSubscriber.sync();
        Assert.assertNotNull(listener.getRecoveryTask());
        listener.allowStep();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)).sync();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action stopActivity = users[2].stopActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testSuspendFromRunningButJobFailWhileSuspendingThenResumeSucceed() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.RUNNING_JOB_FAIL);
        Action suspendActivity = users[1].suspendActivity(listener);
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState());
        Assert.assertNull(listener.getRecoveryTask());
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testSuspendFromRunningButJobFailWhileSuspendingThenResumeFailsCompileAndRecoveryStarts() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.RUNNING_JOB_FAIL);
        Action suspendActivity = users[1].suspendActivity(listener);
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState());
        Assert.assertNull(listener.getRecoveryTask());
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        ActivityState state = listener.getState();
        Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED);
        Assert.assertNotNull(listener.getRecoveryTask());
        Action stopActivity = users[1].stopActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testSuspendFromRunningButJobFailWhileSuspendingThenResumeFailsRuntimeAndRecoveryStarts() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.RUNNING_JOB_FAIL);
        Action suspendActivity = users[1].suspendActivity(listener);
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState());
        Assert.assertNull(listener.getRecoveryTask());
        listener.onStart(TestEventsListener.Behavior.FAIL_RUNTIME);
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        ActivityState state = listener.getState();
        Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED);
        Assert.assertNotNull(listener.getRecoveryTask());
        Action stopActivity = users[1].stopActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStopWhileSuspended() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.STEP_SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)).sync();
        Action stopActivity = users[0].stopActivity(listener);
        listener.allowStep();
        listener.allowStep();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        users[1].resumeActivity(listener);
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testRecoveryFailureAfterOneAttemptCompilationFailure() throws Exception {
        handler.unregisterListener(listener);
        listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, new ArrayList(allDatasets), statementExecutor, appCtx, hcc, locations, new CountRetryPolicyFactory(1));
        testStartWhenStartSucceed();
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Compilation Failure")));
        waitForStateSubscriber.sync();
        waitForStateSubscriber2.sync();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStartAfterPermenantFailure() throws Exception {
        testRecoveryFailureAfterOneAttemptCompilationFailure();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        users[1].startActivity(listener);
        waitForStateSubscriber.sync();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testStopAfterStartAfterPermenantFailure() throws Exception {
        testStartAfterPermenantFailure();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
        Action stopActivity = users[1].stopActivity(listener);
        waitForStateSubscriber.sync();
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testRecoveryFailureAfterOneAttemptRuntimeFailure() throws Exception {
        handler.unregisterListener(listener);
        listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, new ArrayList(allDatasets), statementExecutor, appCtx, hcc, locations, new CountRetryPolicyFactory(1));
        testStartWhenStartSucceed();
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
        listener.onStart(TestEventsListener.Behavior.FAIL_RUNTIME);
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        waitForStateSubscriber.sync();
        waitForStateSubscriber2.sync();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testRecoveryFailure() throws Exception {
        handler.unregisterListener(listener);
        listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, new ArrayList(allDatasets), statementExecutor, appCtx, hcc, locations, NoRetryPolicyFactory.INSTANCE);
        testStartWhenStartSucceed();
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        waitForStateSubscriber.sync();
        waitForStateSubscriber2.sync();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStopDuringRecoveryAttemptThatSucceeds() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        WaitForStateSubscriber waitForStateSubscriber3 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action stopActivity = users[0].stopActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        waitForStateSubscriber3.sync();
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStopDuringRecoveryAttemptThatFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber3 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action stopActivity = users[0].stopActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        waitForStateSubscriber3.sync();
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStopDuringRecoveryAttemptThatFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_RUNTIME);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber3 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action stopActivity = users[0].stopActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        waitForStateSubscriber3.sync();
        stopActivity.sync();
        assertSuccess(stopActivity);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
    }

    @Test
    public void testStartDuringRecoveryAttemptThatSucceeds() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        startActivity.sync();
        assertFailure(startActivity, 3089);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testStartDuringRecoveryAttemptThatFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        startActivity.sync();
        assertFailure(startActivity, 3089);
    }

    @Test
    public void testStartDuringRecoveryAttemptThatFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_RUNTIME);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        Action startActivity = users[0].startActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        startActivity.sync();
        assertFailure(startActivity, 3089);
    }

    @Test
    public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeSucceeds() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.SUSPENDED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.SUSPENDED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber3 = new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber4 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        waitForStateSubscriber3.sync();
        waitForStateSubscriber4.sync();
        ActivityState state = listener.getState();
        Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED);
        Assert.assertNotNull(listener.getRecoveryTask());
        WaitForStateSubscriber waitForStateSubscriber5 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        waitForStateSubscriber5.sync();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        listener.onStop(TestEventsListener.Behavior.SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.SUSPENDED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.FAIL_RUNTIME);
        WaitForStateSubscriber waitForStateSubscriber3 = new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber4 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        waitForStateSubscriber3.sync();
        waitForStateSubscriber4.sync();
        ActivityState state = listener.getState();
        Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED);
        Assert.assertNotNull(listener.getRecoveryTask());
        WaitForStateSubscriber waitForStateSubscriber5 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        waitForStateSubscriber5.sync();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testSuspendDuringRecoveryAttemptThatFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        Assert.assertEquals(ActivityState.RECOVERING, listener.getState());
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action suspendActivity = users[1].suspendActivity(listener);
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.SUSPENDED, listener.getState());
        WaitForStateSubscriber waitForStateSubscriber3 = new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber4 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        waitForStateSubscriber3.sync();
        waitForStateSubscriber4.sync();
        ActivityState state = listener.getState();
        Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED);
        Assert.assertNotNull(listener.getRecoveryTask());
        WaitForStateSubscriber waitForStateSubscriber5 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        waitForStateSubscriber5.sync();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testSuspendDuringRecoveryAttemptThatFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_RUNTIME);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_RUNTIME);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        Assert.assertEquals(ActivityState.RECOVERING, listener.getState());
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action suspendActivity = users[1].suspendActivity(listener);
        listener.onStart(TestEventsListener.Behavior.FAIL_RUNTIME);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        Assert.assertEquals(ActivityState.SUSPENDED, listener.getState());
        WaitForStateSubscriber waitForStateSubscriber3 = new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED));
        WaitForStateSubscriber waitForStateSubscriber4 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING));
        Action resumeActivity = users[1].resumeActivity(listener);
        resumeActivity.sync();
        assertSuccess(resumeActivity);
        waitForStateSubscriber3.sync();
        waitForStateSubscriber4.sync();
        ActivityState state = listener.getState();
        Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED);
        Assert.assertNotNull(listener.getRecoveryTask());
        WaitForStateSubscriber waitForStateSubscriber5 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        listener.onStart(TestEventsListener.Behavior.SUCCEED);
        waitForStateSubscriber5.sync();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testCreateNewDatasetDuringRecoveryAttemptThatSucceeds() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        Action addDataset = users[1].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        addDataset.sync();
        assertFailure(addDataset, 3091);
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testCreateNewDatasetDuringRecoveryAttemptThatFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action addDataset = users[1].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        addDataset.sync();
        assertFailure(addDataset, 3091);
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testCreateNewDatasetDuringRecoveryAttemptThatFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_RUNTIME);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action addDataset = users[1].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        addDataset.sync();
        assertFailure(addDataset, 3091);
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testCreateNewDatasetWhileStarting() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)).sync();
        Action addDataset = users[1].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        listener.allowStep();
        startActivity.sync();
        assertSuccess(startActivity);
        addDataset.sync();
        assertFailure(addDataset, 3091);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testCreateNewDatasetWhileRunning() throws Exception {
        testStartWhenStartSucceed();
        Action addDataset = users[1].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        addDataset.sync();
        assertFailure(addDataset, 3091);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testCreateNewDatasetWhileSuspended() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.STEP_SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)).sync();
        Action addDataset = users[0].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        listener.allowStep();
        listener.allowStep();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        users[1].resumeActivity(listener);
        addDataset.sync();
        assertFailure(addDataset, 3091);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testCreateNewDatasetWhilePermanentFailure() throws Exception {
        testRecoveryFailureAfterOneAttemptCompilationFailure();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Action addDataset = users[0].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        addDataset.sync();
        assertSuccess(addDataset);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Assert.assertEquals(3L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testDeleteDatasetDuringRecoveryAttemptThatSucceeds() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        Action dropDataset = users[1].dropDataset(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        dropDataset.sync();
        assertFailure(dropDataset, 3092);
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testDeleteDatasetDuringRecoveryAttemptThatFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action dropDataset = users[1].dropDataset(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        dropDataset.sync();
        assertFailure(dropDataset, 3092);
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testDeleteDatasetDuringRecoveryAttemptThatFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_RUNTIME);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action dropDataset = users[1].dropDataset(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        dropDataset.sync();
        assertFailure(dropDataset, 3092);
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testDeleteDatasetWhileStarting() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)).sync();
        Action dropDataset = users[1].dropDataset(firstDataset, listener);
        listener.allowStep();
        startActivity.sync();
        assertSuccess(startActivity);
        dropDataset.sync();
        assertFailure(dropDataset, 3092);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testDeleteDatasetWhileRunning() throws Exception {
        testStartWhenStartSucceed();
        Action dropDataset = users[1].dropDataset(firstDataset, listener);
        dropDataset.sync();
        assertFailure(dropDataset, 3092);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testDeleteDatasetWhilePermanentFailure() throws Exception {
        testRecoveryFailureAfterOneAttemptCompilationFailure();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Action dropDataset = users[0].dropDataset(secondDataset, listener);
        dropDataset.sync();
        assertSuccess(dropDataset);
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Assert.assertEquals(1L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testDeleteDatasetWhileSuspended() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.STEP_SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)).sync();
        Action dropDataset = users[0].dropDataset(secondDataset, listener);
        listener.allowStep();
        listener.allowStep();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        users[1].resumeActivity(listener);
        dropDataset.sync();
        assertFailure(dropDataset, 3092);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        Assert.assertEquals(2L, listener.getDatasets().size());
        Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size());
    }

    @Test
    public void testCreateNewIndexDuringRecoveryAttemptThatSucceeds() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        Action addIndex = users[1].addIndex(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        addIndex.sync();
        assertFailure(addIndex, 3094);
    }

    @Test
    public void testCreateNewIndexDuringRecoveryAttemptThatFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action addIndex = users[1].addIndex(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        addIndex.sync();
        assertFailure(addIndex, 3094);
    }

    @Test
    public void testCreateNewIndexDuringRecoveryAttemptThatFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_RUNTIME);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action addIndex = users[1].addIndex(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        addIndex.sync();
        assertFailure(addIndex, 3094);
    }

    @Test
    public void testCreateNewIndexWhileStarting() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)).sync();
        Action addIndex = users[1].addIndex(firstDataset, listener);
        listener.allowStep();
        startActivity.sync();
        assertSuccess(startActivity);
        addIndex.sync();
        assertFailure(addIndex, 3094);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testCreateNewIndexWhileRunning() throws Exception {
        testStartWhenStartSucceed();
        Action addIndex = users[1].addIndex(firstDataset, listener);
        addIndex.sync();
        assertFailure(addIndex, 3094);
    }

    @Test
    public void testCreateNewIndexWhilePermanentFailure() throws Exception {
        testRecoveryFailureAfterOneAttemptCompilationFailure();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Action addIndex = users[1].addIndex(firstDataset, listener);
        addIndex.sync();
        assertSuccess(addIndex);
    }

    @Test
    public void testCreateNewIndexWhileSuspended() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.STEP_SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)).sync();
        Action addIndex = users[0].addIndex(firstDataset, listener);
        listener.allowStep();
        listener.allowStep();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        users[1].resumeActivity(listener);
        addIndex.sync();
        assertFailure(addIndex, 3094);
    }

    @Test
    public void testDeleteIndexDuringRecoveryAttemptThatSucceeds() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        waitForStateSubscriber.sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING));
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        Action dropIndex = users[1].dropIndex(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        dropIndex.sync();
        assertFailure(dropIndex, 3095);
    }

    @Test
    public void testDeleteIndexDuringRecoveryAttemptThatFailsCompile() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_COMPILE);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action dropIndex = users[1].dropIndex(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        dropIndex.sync();
        assertFailure(dropIndex, 3095);
    }

    @Test
    public void testDeleteIndexDuringRecoveryAttemptThatFailsRuntime() throws Exception {
        testStartWhenStartSucceed();
        listener.onStart(TestEventsListener.Behavior.FAIL_COMPILE);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, Collections.singletonList(new HyracksDataException("Runtime Failure")));
        listener.onStart(TestEventsListener.Behavior.STEP_FAIL_RUNTIME);
        waitForStateSubscriber.sync();
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)).sync();
        WaitForStateSubscriber waitForStateSubscriber2 = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
        Action dropIndex = users[1].dropIndex(firstDataset, listener);
        listener.allowStep();
        waitForStateSubscriber2.sync();
        dropIndex.sync();
        assertFailure(dropIndex, 3095);
    }

    @Test
    public void testDeleteIndexwWhileStarting() throws Exception {
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        listener.onStart(TestEventsListener.Behavior.STEP_SUCCEED);
        Action startActivity = users[0].startActivity(listener);
        new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)).sync();
        Action dropIndex = users[1].dropIndex(firstDataset, listener);
        listener.allowStep();
        startActivity.sync();
        assertSuccess(startActivity);
        dropIndex.sync();
        assertFailure(dropIndex, 3095);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testDeleteIndexWhileRunning() throws Exception {
        testStartWhenStartSucceed();
        Action dropIndex = users[1].dropIndex(firstDataset, listener);
        dropIndex.sync();
        assertFailure(dropIndex, 3095);
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
    }

    @Test
    public void testDeleteIndexWhilePermanentFailure() throws Exception {
        testRecoveryFailureAfterOneAttemptCompilationFailure();
        Assert.assertEquals(ActivityState.STOPPED, listener.getState());
        Action dropIndex = users[1].dropIndex(firstDataset, listener);
        dropIndex.sync();
        assertSuccess(dropIndex);
    }

    @Test
    public void testDeleteIndexWhileSuspended() throws Exception {
        testStartWhenStartSucceed();
        Assert.assertEquals(ActivityState.RUNNING, listener.getState());
        listener.onStop(TestEventsListener.Behavior.STEP_SUCCEED);
        Action suspendActivity = users[1].suspendActivity(listener);
        new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)).sync();
        Action dropIndex = users[0].dropIndex(firstDataset, listener);
        listener.allowStep();
        listener.allowStep();
        suspendActivity.sync();
        assertSuccess(suspendActivity);
        users[1].resumeActivity(listener);
        dropIndex.sync();
        assertFailure(dropIndex, 3095);
    }

    @Test
    public void testSuspendedAllActivities() throws Exception {
        TestEventsListener[] testEventsListenerArr = new TestEventsListener[3];
        for (int i = 0; i < testEventsListenerArr.length; i++) {
            EntityId entityId2 = new EntityId("Feed", dataverseName, "entityName" + i);
            ClusterControllerService clusterControllerService = (ClusterControllerService) Mockito.mock(ClusterControllerService.class);
            CCServiceContext cCServiceContext = (CCServiceContext) Mockito.mock(CCServiceContext.class);
            CcApplicationContext ccApplicationContext = (CcApplicationContext) Mockito.mock(CcApplicationContext.class);
            IStatementExecutor iStatementExecutor = (IStatementExecutor) Mockito.mock(IStatementExecutor.class);
            IHyracksClientConnection iHyracksClientConnection = (IHyracksClientConnection) Mockito.mock(IHyracksClientConnection.class);
            IFunctionExtensionManager iFunctionExtensionManager = (IFunctionExtensionManager) Mockito.mock(IFunctionExtensionManager.class);
            Mockito.when(iFunctionExtensionManager.getFunctionManager()).thenReturn(new FunctionManager(FunctionCollection.createDefaultFunctionCollection()));
            Mockito.when(ccApplicationContext.getExtensionManager()).thenReturn(iFunctionExtensionManager);
            Mockito.when(ccApplicationContext.getActiveNotificationHandler()).thenReturn(handler);
            Mockito.when(ccApplicationContext.getMetadataLockManager()).thenReturn(lockManager);
            Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(cCServiceContext);
            Mockito.when(ccApplicationContext.getClusterStateManager()).thenReturn(clusterStateManager);
            Mockito.when(cCServiceContext.getControllerService()).thenReturn(clusterControllerService);
            Mockito.when(clusterControllerService.getExecutor()).thenReturn(executor);
            Mockito.when(ccApplicationContext.getStorageComponentProvider()).thenReturn(componentProvider);
            TestEventsListener testEventsListener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId2, new ArrayList(allDatasets), iStatementExecutor, ccApplicationContext, iHyracksClientConnection, new AlgebricksAbsolutePartitionConstraint(nodes), new InfiniteRetryPolicyFactory());
            listener = testEventsListener;
            testEventsListenerArr[i] = testEventsListener;
        }
        Action suspendAllActivities = users[0].suspendAllActivities(handler);
        suspendAllActivities.sync();
        assertSuccess(suspendAllActivities);
        Action query = users[1].query(firstDataset, new Semaphore(1));
        query.sync();
        assertSuccess(query);
        Action addDataset = users[1].addDataset(new Dataset(dataverseName, "newDataset", (String) null, (String) null, (String) null, (String) null, (Map) null, (IDatasetDetails) null, (Map) null, (DatasetConfig.DatasetType) null, 0, 0), listener);
        Assert.assertFalse(addDataset.isDone());
        Action resumeAllActivities = users[0].resumeAllActivities(handler);
        resumeAllActivities.sync();
        assertSuccess(resumeAllActivities);
        addDataset.sync();
        assertSuccess(addDataset);
    }

    private void assertFailure(Action action, int i) throws Exception {
        HyracksDataException failure = action.getFailure();
        try {
            Assert.assertTrue(action.hasFailed());
            Assert.assertNotNull(failure);
            Assert.assertEquals(i, failure.getErrorCode());
        } catch (Exception e) {
            throw new Exception("Expected failure: " + i + ". Found failure: " + failure);
        }
    }

    private void assertSuccess(Action action) throws Exception {
        if (action.hasFailed()) {
            System.err.println("Action failed while it was expected to succeed");
            action.getFailure().printStackTrace();
            throw action.getFailure();
        }
    }
}
