package org.apache.asterix.app.active;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.active.message.StopRuntimeParameters;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.metadata.api.IActiveEntityController;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.Span;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/app/active/ActiveEntityEventsListener.class */
public abstract class ActiveEntityEventsListener implements IActiveEntityController {
    private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}";
    protected final IClusterStateManager clusterStateManager;
    protected final ActiveNotificationHandler handler;
    protected final IStatementExecutor statementExecutor;
    protected final ICcApplicationContext appCtx;
    protected final MetadataProvider metadataProvider;
    protected final IHyracksClientConnection hcc;
    protected final EntityId entityId;
    private final List<Dataset> datasets;
    protected final ActiveEvent statsUpdatedEvent;
    protected final String runtimeName;
    protected final IRetryPolicyFactory retryPolicyFactory;
    private AlgebricksAbsolutePartitionConstraint locations;
    protected ActivityState prevState;
    protected JobId jobId;
    protected volatile RecoveryTask rt;
    protected Exception jobFailure;
    protected Exception resumeFailure;
    protected Exception startFailure;
    protected Exception stopFailure;
    protected Exception recoverFailure;
    private static final Logger LOGGER = LogManager.getLogger();
    private static final Level level = Level.DEBUG;
    private static final ActiveEvent STATE_CHANGED = new ActiveEvent((JobId) null, ActiveEvent.Kind.STATE_CHANGED, (EntityId) null, (Object) null);
    private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING, ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING, ActivityState.CANCELLING);
    protected static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;
    protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList();
    protected volatile boolean suspended = false;
    protected volatile ActivityState state = ActivityState.STOPPED;
    protected volatile long statsTimestamp = -1;
    protected volatile boolean isFetchingStats = false;
    protected String stats = DEFAULT_ACTIVE_STATS;
    protected int numRegistered = 0;
    protected int numDeRegistered = 0;

    /* renamed from: org.apache.asterix.app.active.ActiveEntityEventsListener$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/asterix/app/active/ActiveEntityEventsListener$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$asterix$active$ActiveEvent$Kind = new int[ActiveEvent.Kind.values().length];

        static {
            try {
                $SwitchMap$org$apache$asterix$active$ActiveEvent$Kind[ActiveEvent.Kind.JOB_CREATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$asterix$active$ActiveEvent$Kind[ActiveEvent.Kind.JOB_STARTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$asterix$active$ActiveEvent$Kind[ActiveEvent.Kind.JOB_FINISHED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$asterix$active$ActiveEvent$Kind[ActiveEvent.Kind.PARTITION_EVENT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public ActiveEntityEventsListener(IStatementExecutor iStatementExecutor, ICcApplicationContext iCcApplicationContext, IHyracksClientConnection iHyracksClientConnection, EntityId entityId, List<Dataset> list, AlgebricksAbsolutePartitionConstraint algebricksAbsolutePartitionConstraint, String str, IRetryPolicyFactory iRetryPolicyFactory) throws HyracksDataException {
        this.statementExecutor = iStatementExecutor;
        this.appCtx = iCcApplicationContext;
        this.clusterStateManager = iCcApplicationContext.getClusterStateManager();
        this.metadataProvider = new MetadataProvider(iCcApplicationContext, (Dataverse) null);
        this.hcc = iHyracksClientConnection;
        this.entityId = entityId;
        this.datasets = list;
        this.retryPolicyFactory = iRetryPolicyFactory;
        this.statsUpdatedEvent = new ActiveEvent((JobId) null, ActiveEvent.Kind.STATS_UPDATED, entityId, (Object) null);
        this.runtimeName = str;
        this.locations = algebricksAbsolutePartitionConstraint;
        this.handler = (ActiveNotificationHandler) this.metadataProvider.getApplicationContext().getActiveNotificationHandler();
        this.handler.registerListener(this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void setState(ActivityState activityState) {
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "State of " + getEntityId() + "is being set to " + activityState + " from " + this.state);
        }
        this.prevState = this.state;
        this.state = activityState;
        if (activityState == ActivityState.STARTING || activityState == ActivityState.RECOVERING || activityState == ActivityState.RESUMING) {
            this.jobFailure = null;
        } else if (activityState == ActivityState.SUSPENDED) {
            this.suspended = true;
        }
        notifySubscribers(STATE_CHANGED);
    }

    public synchronized void notify(ActiveEvent activeEvent) {
        try {
            if (LOGGER.isEnabled(level)) {
                LOGGER.log(level, "EventListener is notified.");
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$asterix$active$ActiveEvent$Kind[activeEvent.getEventKind().ordinal()]) {
                case 1:
                case 2:
                    break;
                case 3:
                    finish(activeEvent);
                    break;
                case 4:
                    handle((ActivePartitionMessage) activeEvent.getEventObject());
                    break;
                default:
                    LOGGER.log(Level.DEBUG, "Unhandled feed event notification: " + activeEvent);
                    break;
            }
            notifySubscribers(activeEvent);
        } catch (Exception e) {
            LOGGER.log(Level.ERROR, "Unhandled Exception", e);
        }
    }

    protected synchronized void handle(ActivePartitionMessage activePartitionMessage) {
        if (activePartitionMessage.getEvent() != ActivePartitionMessage.Event.RUNTIME_REGISTERED) {
            if (activePartitionMessage.getEvent() == ActivePartitionMessage.Event.RUNTIME_DEREGISTERED) {
                this.numDeRegistered++;
            }
        } else {
            this.numRegistered++;
            if (allPartitionsRegisteredAndNotCancelling()) {
                setState(ActivityState.RUNNING);
            }
        }
    }

    private boolean allPartitionsRegisteredAndNotCancelling() {
        return this.numRegistered == this.locations.getLocations().length && this.state != ActivityState.CANCELLING;
    }

    protected void finish(ActiveEvent activeEvent) throws HyracksDataException {
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "the job " + this.jobId + " finished");
        }
        JobId jobId = this.jobId;
        if (this.numRegistered != this.numDeRegistered) {
            LOGGER.log(Level.WARN, "the job {} finished with reported runtime registrations = {} and deregistrations = {}", this.jobId, Integer.valueOf(this.numRegistered), Integer.valueOf(this.numDeRegistered));
        }
        this.jobId = null;
        Pair pair = (Pair) activeEvent.getEventObject();
        JobStatus jobStatus = (JobStatus) pair.getLeft();
        List list = (List) pair.getRight();
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "The job finished with status: " + jobStatus);
        }
        if (jobSuccessfullyTerminated(jobStatus)) {
            setState(this.state == ActivityState.SUSPENDING ? ActivityState.SUSPENDED : ActivityState.STOPPED);
            return;
        }
        this.jobFailure = list.isEmpty() ? new RuntimeDataException(3102, new Serializable[0]) : (Exception) list.get(0);
        LOGGER.error("Active Job {} failed", jobId, this.jobFailure);
        setState((this.state == ActivityState.STOPPING || this.state == ActivityState.CANCELLING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED);
        if (this.prevState == ActivityState.RUNNING) {
            recover();
        }
    }

    private boolean jobSuccessfullyTerminated(JobStatus jobStatus) {
        return jobStatus.equals(JobStatus.TERMINATED);
    }

    public synchronized void subscribe(IActiveEntityEventSubscriber iActiveEntityEventSubscriber) {
        iActiveEntityEventSubscriber.subscribed(this);
        if (iActiveEntityEventSubscriber.isDone()) {
            return;
        }
        this.subscribers.add(iActiveEntityEventSubscriber);
    }

    public EntityId getEntityId() {
        return this.entityId;
    }

    public ActivityState getState() {
        return this.state;
    }

    public synchronized boolean isEntityUsingDataset(IDataset iDataset) {
        return isActive() && getDatasets().contains(iDataset);
    }

    public synchronized void remove(Dataset dataset) throws HyracksDataException {
        if (isActive()) {
            throw new RuntimeDataException(3092, new Serializable[]{this.entityId, this.state});
        }
        getDatasets().remove(dataset);
    }

    public synchronized void add(Dataset dataset) throws HyracksDataException {
        if (isActive()) {
            throw new RuntimeDataException(3091, new Serializable[]{this.entityId, this.state});
        }
        getDatasets().add(dataset);
    }

    public JobId getJobId() {
        return this.jobId;
    }

    public String getStats() {
        return this.stats;
    }

    public long getStatsTimeStamp() {
        return this.statsTimestamp;
    }

    public String formatStats(List<String> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("{\"Stats\": [").append(list.get(0));
        for (int i = 1; i < list.size(); i++) {
            sb.append(", ").append(list.get(i));
        }
        sb.append("]}");
        return sb.toString();
    }

    public void refreshStats(long j) throws HyracksDataException {
        LOGGER.log(level, "refreshStats called");
        synchronized (this) {
            if (this.state != ActivityState.RUNNING) {
                LOGGER.log(level, "returning immediately since state = " + this.state);
                notifySubscribers(this.statsUpdatedEvent);
                return;
            }
            if (this.isFetchingStats) {
                LOGGER.log(level, "returning immediately since fetchingStats = " + this.isFetchingStats);
                return;
            }
            this.isFetchingStats = true;
            ICCMessageBroker messageBroker = this.metadataProvider.getApplicationContext().getServiceContext().getMessageBroker();
            long newRequestId = messageBroker.newRequestId();
            ArrayList arrayList = new ArrayList();
            List asList = Arrays.asList(this.locations.getLocations());
            for (int i = 0; i < asList.size(); i++) {
                arrayList.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(this.entityId, this.runtimeName, i), newRequestId));
            }
            try {
                this.stats = formatStats((List) messageBroker.sendSyncRequestToNCs(newRequestId, asList, arrayList, j));
                this.statsTimestamp = System.currentTimeMillis();
                notifySubscribers(this.statsUpdatedEvent);
                this.isFetchingStats = false;
            } catch (Exception e) {
                throw HyracksDataException.create(e);
            }
        }
    }

    protected synchronized void notifySubscribers(ActiveEvent activeEvent) {
        notifyAll();
        Iterator<IActiveEntityEventSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            IActiveEntityEventSubscriber next = it.next();
            if (next.isDone()) {
                it.remove();
            } else {
                next.notify(activeEvent);
                if (next.isDone()) {
                    it.remove();
                }
            }
        }
    }

    public AlgebricksAbsolutePartitionConstraint getLocations() {
        return this.locations;
    }

    protected synchronized void waitForNonTransitionState() throws InterruptedException {
        while (true) {
            if (!TRANSITION_STATES.contains(this.state) && !this.suspended) {
                return;
            } else {
                wait();
            }
        }
    }

    public synchronized void recover() {
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "Recover is called on " + this.entityId);
        }
        if (this.retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
            LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
            setState(ActivityState.STOPPED);
            return;
        }
        ExecutorService executor = this.appCtx.getServiceContext().getControllerService().getExecutor();
        setState(ActivityState.TEMPORARILY_FAILED);
        LOGGER.log(level, "Recovery task has been submitted");
        this.rt = new RecoveryTask(this.appCtx, this, this.retryPolicyFactory);
        executor.submit(this.rt.recover());
    }

    public synchronized void start(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
        waitForNonTransitionState();
        if (this.state != ActivityState.STOPPED) {
            throw new RuntimeDataException(3089, new Serializable[]{this.entityId, this.state});
        }
        try {
            setState(ActivityState.STARTING);
            doStart(metadataProvider);
            setRunning(metadataProvider, true);
        } catch (Exception e) {
            setState(ActivityState.STOPPED);
            LOGGER.log(Level.ERROR, "Failed to start the entity " + this.entityId, e);
            throw HyracksDataException.create(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void doStart(MetadataProvider metadataProvider) throws HyracksDataException {
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED, ActivityState.STOPPED));
        this.jobId = compileAndStartJob(metadataProvider);
        this.numRegistered = 0;
        this.numDeRegistered = 0;
        try {
            waitForStateSubscriber.sync();
            if (waitForStateSubscriber.getFailure() != null) {
                throw waitForStateSubscriber.getFailure();
            }
        } catch (InterruptedException e) {
            if (!waitForStateSubscriber.isDone()) {
                setState(ActivityState.CANCELLING);
                cancelJob(e);
                throw HyracksDataException.create(e);
            }
            if (waitForStateSubscriber.getFailure() != null) {
                throw HyracksDataException.create(waitForStateSubscriber.getFailure());
            }
            Thread.currentThread().interrupt();
        } catch (Throwable th) {
            throw HyracksDataException.create(th);
        }
    }

    private void cancelJob(Throwable th) {
        cancelJobSafely(this.metadataProvider, th);
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
        Span start = Span.start(2L, TimeUnit.MINUTES);
        InvokeUtil.doUninterruptibly(() -> {
            if (waitForStateSubscriber.sync(start)) {
                return;
            }
            ExitUtil.halt(22);
        });
    }

    protected void cancelJobSafely(MetadataProvider metadataProvider, Throwable th) {
        try {
            metadataProvider.getApplicationContext().getHcc().cancelJob(this.jobId);
        } catch (Throwable th2) {
            LOGGER.warn("Failed to cancel active job", th2);
            th.addSuppressed(th2);
        }
    }

    protected abstract JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException;

    protected synchronized void doStop(MetadataProvider metadataProvider, long j, TimeUnit timeUnit) throws HyracksDataException {
        EnumSet of;
        ActivityState activityState = this.state;
        if (activityState == ActivityState.STOPPING) {
            of = EnumSet.of(ActivityState.STOPPED);
        } else {
            if (activityState != ActivityState.SUSPENDING) {
                throw new IllegalStateException("stop with what intention?? Current state is " + activityState);
            }
            of = EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED);
        }
        WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(this, of);
        String name = Thread.currentThread().getName();
        try {
            try {
                Thread.currentThread().setName(name + " : WaitForCompletionForJobId: " + this.jobId);
                sendStopMessages(metadataProvider, j, timeUnit);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Waiting for its state to become " + of);
                }
                waitForStateSubscriber.sync();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Disconnect has been completed " + of);
                }
                Thread.currentThread().setName(name);
            } catch (InterruptedException e) {
                forceStop(waitForStateSubscriber, e);
                Thread.currentThread().interrupt();
                Thread.currentThread().setName(name);
            } catch (Throwable th) {
                forceStop(waitForStateSubscriber, th);
                Thread.currentThread().setName(name);
            }
        } catch (Throwable th2) {
            Thread.currentThread().setName(name);
            throw th2;
        }
    }

    private void forceStop(WaitForStateSubscriber waitForStateSubscriber, Throwable th) {
        if (!waitForStateSubscriber.isDone()) {
            cancelJob(th);
        }
        LOGGER.warn("Failure encountered while stopping {}", this, th);
    }

    protected void sendStopMessages(MetadataProvider metadataProvider, long j, TimeUnit timeUnit) throws Exception {
        ICCMessageBroker messageBroker = metadataProvider.getApplicationContext().getServiceContext().getMessageBroker();
        AlgebricksAbsolutePartitionConstraint locations = getLocations();
        int i = 0;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.log(Level.INFO, "Sending stop messages to " + locations);
        }
        for (String str : locations.getLocations()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.log(Level.INFO, "Sending to " + str);
            }
            int i2 = i;
            i++;
            messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, new StopRuntimeParameters(getActiveRuntimeId(i2), j, timeUnit)), str);
        }
    }

    protected abstract ActiveRuntimeId getActiveRuntimeId(int i);

    protected abstract void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException;

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void doResume(MetadataProvider metadataProvider) throws HyracksDataException;

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void setRunning(MetadataProvider metadataProvider, boolean z);

    public final synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
        waitForNonTransitionState();
        if (this.state != ActivityState.RUNNING && this.state != ActivityState.TEMPORARILY_FAILED) {
            throw new RuntimeDataException(3090, new Serializable[]{this.entityId, this.state});
        }
        if (this.state == ActivityState.TEMPORARILY_FAILED) {
            if (this.rt != null) {
                setState(ActivityState.STOPPING);
                this.rt.cancel();
                this.rt = null;
            }
            setState(ActivityState.STOPPED);
            try {
                setRunning(metadataProvider, false);
            } catch (Exception e) {
                LOGGER.log(Level.ERROR, "Failed to set the entity state as not running " + this.entityId, e);
                throw HyracksDataException.create(e);
            }
        } else {
            if (this.state != ActivityState.RUNNING) {
                throw new RuntimeDataException(3090, new Serializable[]{this.entityId, this.state});
            }
            setState(ActivityState.STOPPING);
            try {
                try {
                    doStop(metadataProvider, this.appCtx.getActiveProperties().getActiveStopTimeout(), TIMEOUT_UNIT);
                    setRunning(metadataProvider, false);
                } catch (Exception e2) {
                    setState(ActivityState.STOPPED);
                    LOGGER.log(Level.ERROR, "Failed to stop the entity " + this.entityId, e2);
                    throw HyracksDataException.create(e2);
                }
            } catch (Throwable th) {
                setRunning(metadataProvider, false);
                throw th;
            }
        }
        this.stats = DEFAULT_ACTIVE_STATS;
        notifySubscribers(this.statsUpdatedEvent);
    }

    public RecoveryTask getRecoveryTask() {
        return this.rt;
    }

    public void suspend(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException {
        synchronized (this) {
            if (LOGGER.isEnabled(level)) {
                LOGGER.log(level, "suspending entity " + this.entityId);
                LOGGER.log(level, "Waiting for ongoing activities");
            }
            waitForNonTransitionState();
            if (LOGGER.isEnabled(level)) {
                LOGGER.log(level, "Proceeding with suspension. Current state is " + this.state);
            }
            if (this.state == ActivityState.STOPPED) {
                this.suspended = true;
                return;
            }
            if (this.state == ActivityState.SUSPENDED) {
                throw new RuntimeDataException(3103, new Serializable[]{this.entityId, this.state});
            }
            if (this.state == ActivityState.TEMPORARILY_FAILED) {
                this.suspended = true;
                setState(ActivityState.SUSPENDED);
                return;
            }
            setState(ActivityState.SUSPENDING);
            WaitForStateSubscriber waitForStateSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED));
            Future submit = metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor().submit(() -> {
                doSuspend(metadataProvider);
                return null;
            });
            LOGGER.log(level, "Suspension task has been submitted");
            try {
                LOGGER.log(level, "Waiting for suspension task to complete");
                submit.get();
                LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
                waitForStateSubscriber.sync();
            } catch (Exception e) {
                synchronized (this) {
                    if (LOGGER.isErrorEnabled()) {
                        LOGGER.log(Level.ERROR, "Failure while waiting for " + this.entityId + " to become suspended", e);
                    }
                    if (this.state == ActivityState.SUSPENDING) {
                        if (this.jobId != null) {
                            setState(this.prevState);
                        } else {
                            setState(ActivityState.STOPPED);
                        }
                    }
                    throw HyracksDataException.create(e);
                }
            }
        }
    }

    public synchronized void resume(MetadataProvider metadataProvider) throws HyracksDataException {
        if (this.state == ActivityState.STOPPED) {
            return;
        }
        if (this.state != ActivityState.SUSPENDED && this.state != ActivityState.TEMPORARILY_FAILED) {
            throw new RuntimeDataException(3104, new Serializable[]{this.entityId, this.state});
        }
        try {
            if (this.prevState == ActivityState.TEMPORARILY_FAILED) {
                setState(ActivityState.TEMPORARILY_FAILED);
                return;
            }
            setState(ActivityState.RESUMING);
            this.rt = new RecoveryTask(this.appCtx, this, this.retryPolicyFactory);
            try {
                this.rt.resumeOrRecover(metadataProvider);
            } catch (Exception e) {
                LOGGER.log(Level.WARN, "Failure while attempting to resume " + this.entityId, e);
            }
        } finally {
            this.suspended = false;
            notifyAll();
        }
    }

    public boolean isActive() {
        return (this.state == ActivityState.STOPPED || this.state == ActivityState.CANCELLING) ? false : true;
    }

    public void unregister() throws HyracksDataException {
        this.handler.unregisterListener(this);
    }

    public void setLocations(AlgebricksAbsolutePartitionConstraint algebricksAbsolutePartitionConstraint) {
        this.locations = algebricksAbsolutePartitionConstraint;
    }

    public Exception getJobFailure() {
        return this.jobFailure;
    }

    public List<Dataset> getDatasets() {
        return this.datasets;
    }

    public synchronized void replace(Dataset dataset) {
        if (getDatasets().contains(dataset)) {
            getDatasets().remove(dataset);
            getDatasets().add(dataset);
        }
    }

    public String getDisplayName() throws HyracksDataException {
        return getEntityId().toString();
    }

    public String toString() {
        return "{\"class\":\"" + getClass().getSimpleName() + "\"\"entityId\":\"" + this.entityId + "\"\"state\":\"" + this.state + "\"}";
    }
}
