package org.apache.asterix.app.active;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveNotificationHandler;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.util.SingleThreadEventProcessor;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/app/active/ActiveNotificationHandler.class */
public class ActiveNotificationHandler extends SingleThreadEventProcessor<ActiveEvent> implements IActiveNotificationHandler, IJobLifecycleListener {
    private static final Logger LOGGER = LogManager.getLogger();
    private static final Level level = Level.DEBUG;
    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
    private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners;
    private final Map<JobId, EntityId> jobId2EntityId;
    private boolean initialized;
    private boolean suspended;

    public ActiveNotificationHandler() {
        super(ActiveNotificationHandler.class.getSimpleName());
        this.initialized = false;
        this.suspended = false;
        this.jobId2EntityId = new HashMap();
        this.entityEventListeners = new HashMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handle(ActiveEvent activeEvent) {
        EntityId entityId = this.jobId2EntityId.get(activeEvent.getJobId());
        if (entityId == null) {
            LOGGER.log(Level.ERROR, "Entity not found for received message for job " + activeEvent.getJobId());
            return;
        }
        IActiveEntityEventsListener iActiveEntityEventsListener = this.entityEventListeners.get(entityId);
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "Next event is of type " + activeEvent.getEventKind());
        }
        if (activeEvent.getEventKind() == ActiveEvent.Kind.JOB_FINISHED) {
            LOGGER.log(level, "Removing the job");
            this.jobId2EntityId.remove(activeEvent.getJobId());
        }
        if (iActiveEntityEventsListener != null) {
            LOGGER.log(level, "Notifying the listener");
            iActiveEntityEventsListener.notify(activeEvent);
        }
    }

    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId);
        }
        Serializable property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
        if (!(property instanceof EntityId)) {
            if (LOGGER.isEnabled(level)) {
                LOGGER.log(level, "Job is not of type active job. property found to be: " + property);
            }
        } else {
            EntityId entityId = (EntityId) property;
            monitorJob(jobId, entityId);
            LOGGER.log(level, "Job was found to be: " + (this.jobId2EntityId.get(jobId) != null ? "Active" : "Inactive"));
            add(new ActiveEvent(jobId, ActiveEvent.Kind.JOB_CREATED, entityId, jobSpecification));
        }
    }

    private synchronized void monitorJob(JobId jobId, EntityId entityId) {
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
        }
        boolean z = this.jobId2EntityId.get(jobId) != null;
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "Job was found to be: " + (z ? "Active" : "Inactive"));
        }
        if (this.entityEventListeners.containsKey(entityId)) {
            if (this.jobId2EntityId.containsKey(jobId)) {
                if (LOGGER.isErrorEnabled()) {
                    LOGGER.error("Job is already being monitored for job: " + jobId);
                    return;
                }
                return;
            } else if (LOGGER.isEnabled(level)) {
                LOGGER.log(level, "monitoring started for job id: " + jobId);
            }
        } else if (LOGGER.isEnabled(level)) {
            LOGGER.info("No listener was found for the entity: " + entityId);
        }
        this.jobId2EntityId.put(jobId, entityId);
    }

    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
        EntityId entityId = this.jobId2EntityId.get(jobId);
        if (entityId != null) {
            add(new ActiveEvent(jobId, ActiveEvent.Kind.JOB_STARTED, entityId, (Object) null));
        }
    }

    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> list) throws HyracksException {
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "Getting notified of job finish for JobId: " + jobId);
        }
        EntityId entityId = this.jobId2EntityId.get(jobId);
        if (entityId != null) {
            add(new ActiveEvent(jobId, ActiveEvent.Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, list)));
        } else if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "no need to notify job finish");
        }
    }

    public void receive(ActivePartitionMessage activePartitionMessage) {
        add(new ActiveEvent(activePartitionMessage.getJobId(), ActiveEvent.Kind.PARTITION_EVENT, activePartitionMessage.getActiveRuntimeId().getEntityId(), activePartitionMessage));
    }

    public IActiveEntityEventsListener getListener(EntityId entityId) {
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
        }
        IActiveEntityEventsListener iActiveEntityEventsListener = this.entityEventListeners.get(entityId);
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "Listener found: " + iActiveEntityEventsListener);
        }
        return this.entityEventListeners.get(entityId);
    }

    public synchronized IActiveEntityEventsListener[] getEventListeners() {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("getEventListeners() was called");
            LOGGER.trace("returning " + this.entityEventListeners.size() + " Listeners");
        }
        return (IActiveEntityEventsListener[]) this.entityEventListeners.values().toArray(new IActiveEntityEventsListener[this.entityEventListeners.size()]);
    }

    public synchronized void registerListener(IActiveEntityEventsListener iActiveEntityEventsListener) throws HyracksDataException {
        if (this.suspended) {
            throw new RuntimeDataException(3096, new Serializable[0]);
        }
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity " + iActiveEntityEventsListener.getEntityId());
        }
        if (this.entityEventListeners.containsKey(iActiveEntityEventsListener.getEntityId())) {
            throw new RuntimeDataException(3093, new Serializable[]{iActiveEntityEventsListener.getEntityId()});
        }
        this.entityEventListeners.put(iActiveEntityEventsListener.getEntityId(), iActiveEntityEventsListener);
    }

    public synchronized void unregisterListener(IActiveEntityEventsListener iActiveEntityEventsListener) throws HyracksDataException {
        if (this.suspended) {
            throw new RuntimeDataException(3096, new Serializable[0]);
        }
        if (LOGGER.isEnabled(level)) {
            LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + iActiveEntityEventsListener.getEntityId());
        }
        IActiveEntityEventsListener remove = this.entityEventListeners.remove(iActiveEntityEventsListener.getEntityId());
        if (remove == null) {
            throw new RuntimeDataException(3097, new Serializable[]{iActiveEntityEventsListener.getEntityId()});
        }
        if (remove.isActive()) {
            this.entityEventListeners.put(remove.getEntityId(), remove);
            throw new RuntimeDataException(3098, new Serializable[]{iActiveEntityEventsListener.getEntityId()});
        }
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public void setInitialized(boolean z) throws HyracksDataException {
        if (this.initialized) {
            throw new RuntimeDataException(3099, new Serializable[0]);
        }
        this.initialized = z;
    }

    public void recover() {
        LOGGER.info("Starting active recovery");
        for (IActiveEntityEventsListener iActiveEntityEventsListener : getEventListeners()) {
            synchronized (iActiveEntityEventsListener) {
                if (LOGGER.isEnabled(level)) {
                    LOGGER.log(level, "Entity " + iActiveEntityEventsListener.getEntityId() + " is " + iActiveEntityEventsListener.getState());
                }
                iActiveEntityEventsListener.notifyAll();
            }
        }
    }

    public void suspend(MetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, InterruptedException {
        synchronized (this) {
            if (this.suspended) {
                throw new RuntimeDataException(3107, new Serializable[0]);
            }
            LOGGER.log(level, "Suspending active events handler");
            this.suspended = true;
        }
        try {
            IMetadataLockManager metadataLockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
            Iterator<IActiveEntityEventsListener> it = this.entityEventListeners.values().iterator();
            while (it.hasNext()) {
                ActiveEntityEventsListener activeEntityEventsListener = (IActiveEntityEventsListener) it.next();
                String dataverse = activeEntityEventsListener.getEntityId().getDataverse();
                String entityName = activeEntityEventsListener.getEntityId().getEntityName();
                if (LOGGER.isEnabled(level)) {
                    LOGGER.log(level, "Suspending " + activeEntityEventsListener.getEntityId());
                }
                LOGGER.log(level, "Acquiring locks");
                metadataLockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverse + '.' + entityName);
                Iterator<Dataset> it2 = activeEntityEventsListener.getDatasets().iterator();
                while (it2.hasNext()) {
                    metadataLockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), DatasetUtil.getFullyQualifiedName(it2.next()));
                }
                LOGGER.log(level, "locks acquired");
                activeEntityEventsListener.suspend(metadataProvider);
                if (LOGGER.isEnabled(level)) {
                    LOGGER.log(level, activeEntityEventsListener.getEntityId() + " suspended");
                }
            }
        } catch (Throwable th) {
            LOGGER.error("Suspend active failed", th);
            ExitUtil.halt(17);
        }
    }

    public void resume(MetadataProvider metadataProvider) throws HyracksDataException {
        LOGGER.log(level, "Resuming active events handler");
        try {
            Iterator<IActiveEntityEventsListener> it = this.entityEventListeners.values().iterator();
            while (it.hasNext()) {
                ActiveEntityEventsListener activeEntityEventsListener = (IActiveEntityEventsListener) it.next();
                if (LOGGER.isEnabled(level)) {
                    LOGGER.log(level, "Resuming " + activeEntityEventsListener.getEntityId());
                }
                activeEntityEventsListener.resume(metadataProvider);
                if (LOGGER.isEnabled(level)) {
                    LOGGER.log(level, activeEntityEventsListener.getEntityId() + " resumed");
                }
            }
        } catch (Throwable th) {
            LOGGER.error("Resume active failed", th);
            ExitUtil.halt(18);
        }
        synchronized (this) {
            this.suspended = false;
        }
    }
}
