package org.apache.asterix.external.feed.management;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveJob;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.feed.api.FeedOperationCounter;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
import org.apache.asterix.external.feed.api.IFeedJoint;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/feed/management/FeedEventsListener.class */
public class FeedEventsListener implements IActiveEntityEventsListener {
    private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class);
    private EntityId entityId;
    private IFeedJoint sourceFeedJoint;
    private final List<IActiveLifecycleEventSubscriber> subscribers = new ArrayList();
    private final Map<Long, ActiveJob> jobs = new HashMap();
    private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline = new HashMap();
    private final Map<EntityId, FeedIntakeInfo> entity2Intake = new HashMap();
    private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos = new HashMap();
    private final Map<Long, ActiveJob> intakeJobs = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.asterix.external.feed.management.FeedEventsListener$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/asterix/external/feed/management/FeedEventsListener$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$asterix$active$ActiveEvent$EventKind;

        static {
            try {
                $SwitchMap$org$apache$asterix$external$feed$api$IFeedJoint$FeedJointType[IFeedJoint.FeedJointType.COMPUTE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$asterix$external$feed$api$IFeedJoint$FeedJointType[IFeedJoint.FeedJointType.INTAKE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$asterix$external$util$FeedUtils$JobType = new int[FeedUtils.JobType.values().length];
            try {
                $SwitchMap$org$apache$asterix$external$util$FeedUtils$JobType[FeedUtils.JobType.INTAKE.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$asterix$external$util$FeedUtils$JobType[FeedUtils.JobType.FEED_CONNECT.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$asterix$active$ActiveEvent$EventKind = new int[ActiveEvent.EventKind.values().length];
            try {
                $SwitchMap$org$apache$asterix$active$ActiveEvent$EventKind[ActiveEvent.EventKind.JOB_START.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$asterix$active$ActiveEvent$EventKind[ActiveEvent.EventKind.JOB_FINISH.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$asterix$active$ActiveEvent$EventKind[ActiveEvent.EventKind.PARTITION_EVENT.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    public FeedEventsListener(EntityId entityId) {
        this.entityId = entityId;
    }

    public void notify(ActiveEvent activeEvent) {
        try {
            switch (AnonymousClass1.$SwitchMap$org$apache$asterix$active$ActiveEvent$EventKind[activeEvent.getEventKind().ordinal()]) {
                case 1:
                    handleJobStartEvent(activeEvent);
                    break;
                case 2:
                    handleJobFinishEvent(activeEvent);
                    break;
                case 3:
                    handlePartitionStart(activeEvent);
                    break;
                default:
                    LOGGER.warn("Unknown Feed Event" + activeEvent);
                    break;
            }
        } catch (Exception e) {
            LOGGER.error("Unhandled Exception", e);
        }
    }

    private synchronized void handleJobStartEvent(ActiveEvent activeEvent) throws Exception {
        ActiveJob activeJob = this.jobs.get(Long.valueOf(activeEvent.getJobId().getId()));
        switch ((FeedUtils.JobType) activeJob.getJobObject()) {
            case INTAKE:
                handleIntakeJobStartMessage((FeedIntakeInfo) activeJob);
                return;
            case FEED_CONNECT:
                handleCollectJobStartMessage((FeedConnectJobInfo) activeJob);
                return;
            default:
                return;
        }
    }

    private synchronized void handleJobFinishEvent(ActiveEvent activeEvent) throws Exception {
        ActiveJob activeJob = this.jobs.get(Long.valueOf(activeEvent.getJobId().getId()));
        switch ((FeedUtils.JobType) activeJob.getJobObject()) {
            case INTAKE:
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Intake Job finished for feed intake " + activeJob.getJobId());
                }
                handleFeedIntakeJobFinishMessage((FeedIntakeInfo) activeJob, activeEvent);
                return;
            case FEED_CONNECT:
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Collect Job finished for  " + activeJob);
                }
                handleFeedCollectJobFinishMessage((FeedConnectJobInfo) activeJob);
                return;
            default:
                return;
        }
    }

    private synchronized void handlePartitionStart(ActiveEvent activeEvent) {
        ActiveJob activeJob = this.jobs.get(Long.valueOf(activeEvent.getJobId().getId()));
        switch ((FeedUtils.JobType) activeJob.getJobObject()) {
            case INTAKE:
                handleIntakePartitionStarts(activeEvent, activeJob);
                return;
            case FEED_CONNECT:
                ((FeedConnectJobInfo) activeJob).partitionStart();
                if (((FeedConnectJobInfo) activeJob).collectionStarted()) {
                    notifyFeedEventSubscribers(IActiveLifecycleEventSubscriber.ActiveLifecycleEvent.FEED_COLLECT_STARTED);
                    return;
                }
                return;
            default:
                return;
        }
    }

    private void handleIntakePartitionStarts(ActiveEvent activeEvent, ActiveJob activeJob) {
        if (((FeedOperationCounter) this.feedPipeline.get(activeEvent.getEntityId()).first).decrementAndGet() == 0) {
            ((FeedIntakeInfo) activeJob).getIntakeFeedJoint().setState(IFeedJoint.State.ACTIVE);
            activeJob.setState(ActivityState.ACTIVE);
            notifyFeedEventSubscribers(IActiveLifecycleEventSubscriber.ActiveLifecycleEvent.FEED_INTAKE_STARTED);
        }
    }

    public synchronized void registerFeedJoint(IFeedJoint iFeedJoint, int i) throws HyracksDataException {
        Pair<FeedOperationCounter, List<IFeedJoint>> pair = this.feedPipeline.get(iFeedJoint.getOwnerFeedId());
        if (pair == null) {
            Pair<FeedOperationCounter, List<IFeedJoint>> pair2 = new Pair<>(new FeedOperationCounter(i), new ArrayList());
            this.feedPipeline.put(iFeedJoint.getOwnerFeedId(), pair2);
            ((List) pair2.second).add(iFeedJoint);
        } else {
            if (((List) pair.second).contains(iFeedJoint)) {
                throw new RuntimeDataException(3020, new Serializable[]{iFeedJoint});
            }
            ((List) pair.second).add(iFeedJoint);
        }
    }

    public synchronized void deregisterFeedIntakeJob(JobId jobId) {
        FeedIntakeInfo feedIntakeInfo = (FeedIntakeInfo) this.intakeJobs.remove(Long.valueOf(jobId.getId()));
        this.jobs.remove(Long.valueOf(jobId.getId()));
        this.entity2Intake.remove(feedIntakeInfo.getFeedId());
        ((List) this.feedPipeline.get(feedIntakeInfo.getFeedId()).second).remove(feedIntakeInfo.getIntakeFeedJoint());
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Deregistered feed intake job [" + jobId + "]");
        }
    }

    private static synchronized void handleIntakeJobStartMessage(FeedIntakeInfo feedIntakeInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator it = feedIntakeInfo.getSpec().getOperatorMap().entrySet().iterator();
        while (it.hasNext()) {
            IOperatorDescriptor iOperatorDescriptor = (IOperatorDescriptor) ((Map.Entry) it.next()).getValue();
            if (iOperatorDescriptor instanceof FeedIntakeOperatorDescriptor) {
                arrayList.add(iOperatorDescriptor.getOperatorId());
            }
        }
        JobInfo jobInfo = AppContextInfo.INSTANCE.getHcc().getJobInfo(feedIntakeInfo.getJobId());
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Map map = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it2.next());
            int size = map.size();
            for (int i = 0; i < size; i++) {
                arrayList2.add(map.get(Integer.valueOf(i)));
            }
        }
        feedIntakeInfo.setIntakeLocation(arrayList2);
    }

    public IFeedJoint getSourceFeedJoint(FeedConnectionId feedConnectionId) {
        FeedConnectJobInfo feedConnectJobInfo = this.connectJobInfos.get(feedConnectionId);
        if (feedConnectJobInfo != null) {
            return feedConnectJobInfo.getSourceFeedJoint();
        }
        return null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void registerFeedIntakeJob(EntityId entityId, JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
        if (this.entity2Intake.get(entityId) != null) {
            throw new RuntimeDataException(3035, new Serializable[0]);
        }
        if (this.intakeJobs.get(Long.valueOf(jobId.getId())) != null) {
            throw new RuntimeDataException(3036, new Serializable[0]);
        }
        if (this.jobs.get(Long.valueOf(jobId.getId())) != null) {
            throw new RuntimeDataException(3037, new Serializable[0]);
        }
        Pair<FeedOperationCounter, List<IFeedJoint>> pair = this.feedPipeline.get(entityId);
        this.sourceFeedJoint = null;
        Iterator it = ((List) pair.second).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            IFeedJoint iFeedJoint = (IFeedJoint) it.next();
            if (iFeedJoint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
                this.sourceFeedJoint = iFeedJoint;
                break;
            }
        }
        if (this.sourceFeedJoint == null) {
            throw new RuntimeDataException(3021, new Serializable[]{jobId, entityId});
        }
        FeedIntakeInfo feedIntakeInfo = new FeedIntakeInfo(jobId, ActivityState.CREATED, entityId, this.sourceFeedJoint, jobSpecification);
        ((FeedOperationCounter) pair.first).setFeedJobInfo(feedIntakeInfo);
        this.entity2Intake.put(entityId, feedIntakeInfo);
        this.jobs.put(Long.valueOf(jobId.getId()), feedIntakeInfo);
        this.intakeJobs.put(Long.valueOf(jobId.getId()), feedIntakeInfo);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Registered feed intake [" + jobId + "] for feed " + entityId);
        }
    }

    public synchronized void registerFeedCollectionJob(EntityId entityId, FeedConnectionId feedConnectionId, JobId jobId, JobSpecification jobSpecification, Map<String, String> map) throws HyracksDataException {
        if (this.jobs.get(Long.valueOf(jobId.getId())) != null) {
            throw new RuntimeDataException(3037, new Serializable[0]);
        }
        if (this.connectJobInfos.containsKey(Long.valueOf(jobId.getId()))) {
            throw new RuntimeDataException(3037, new Serializable[0]);
        }
        FeedConnectionId feedConnectionId2 = null;
        IFeedJoint iFeedJoint = null;
        Iterator it = ((List) this.feedPipeline.get(entityId).second).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            IFeedJoint iFeedJoint2 = (IFeedJoint) it.next();
            feedConnectionId2 = iFeedJoint2.getReceiver(feedConnectionId);
            if (feedConnectionId2 != null) {
                iFeedJoint = iFeedJoint2;
                break;
            }
        }
        if (feedConnectionId2 == null) {
            LOGGER.warn("Could not register feed collection job [" + jobId + "] for feed connection " + feedConnectionId);
            return;
        }
        FeedConnectJobInfo feedConnectJobInfo = new FeedConnectJobInfo(entityId, jobId, ActivityState.CREATED, feedConnectionId, iFeedJoint, null, jobSpecification, map);
        this.jobs.put(Long.valueOf(jobId.getId()), feedConnectJobInfo);
        this.connectJobInfos.put(feedConnectionId, feedConnectJobInfo);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Registered feed connection [" + jobId + "] for feed " + feedConnectionId);
        }
    }

    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
        try {
            for (IOperatorDescriptor iOperatorDescriptor : jobSpecification.getOperatorMap().values()) {
                if (iOperatorDescriptor instanceof FeedCollectOperatorDescriptor) {
                    registerFeedCollectionJob(((FeedCollectOperatorDescriptor) iOperatorDescriptor).getSourceFeedId(), ((FeedCollectOperatorDescriptor) iOperatorDescriptor).getFeedConnectionId(), jobId, jobSpecification, ((FeedCollectOperatorDescriptor) iOperatorDescriptor).getFeedPolicyProperties());
                    return;
                } else if (iOperatorDescriptor instanceof FeedIntakeOperatorDescriptor) {
                    registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) iOperatorDescriptor).getFeedId(), jobId, jobSpecification);
                    return;
                }
            }
        } catch (Exception e) {
            LOGGER.error(e);
        }
    }

    public synchronized List<String> getConnectionLocations(IFeedJoint iFeedJoint, FeedConnectionRequest feedConnectionRequest) throws Exception {
        List<String> list = null;
        switch (iFeedJoint.getType()) {
            case COMPUTE:
                list = this.connectJobInfos.get(iFeedJoint.getProvider()).getComputeLocations();
                break;
            case INTAKE:
                list = this.entity2Intake.get(iFeedJoint.getOwnerFeedId()).getIntakeLocation();
                break;
        }
        return list;
    }

    private synchronized void notifyFeedEventSubscribers(IActiveLifecycleEventSubscriber.ActiveLifecycleEvent activeLifecycleEvent) {
        if (this.subscribers == null || this.subscribers.isEmpty()) {
            return;
        }
        Iterator<IActiveLifecycleEventSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            it.next().handleEvent(activeLifecycleEvent);
        }
    }

    private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo feedIntakeInfo, ActiveEvent activeEvent) throws Exception {
        JobStatus status = AppContextInfo.INSTANCE.getHcc().getJobInfo(activeEvent.getJobId()).getStatus();
        EntityId feedId = feedIntakeInfo.getFeedId();
        Pair<FeedOperationCounter, List<IFeedJoint>> pair = this.feedPipeline.get(feedId);
        if (status.equals(JobStatus.FAILURE)) {
            ((FeedOperationCounter) pair.first).setFailedIngestion(true);
        }
        deregisterFeedIntakeJob(activeEvent.getJobId());
        this.feedPipeline.remove(feedId);
        this.entity2Intake.remove(feedId);
        notifyFeedEventSubscribers(((FeedOperationCounter) pair.first).isFailedIngestion() ? IActiveLifecycleEventSubscriber.ActiveLifecycleEvent.FEED_INTAKE_FAILURE : IActiveLifecycleEventSubscriber.ActiveLifecycleEvent.FEED_INTAKE_ENDED);
    }

    private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo feedConnectJobInfo) throws Exception {
        FeedConnectionId connectionId = feedConnectJobInfo.getConnectionId();
        JobStatus status = AppContextInfo.INSTANCE.getHcc().getJobInfo(feedConnectJobInfo.getJobId()).getStatus();
        boolean z = status != null && status.equals(JobStatus.FAILURE);
        if (!(feedConnectJobInfo.getState().equals(ActivityState.UNDER_RECOVERY) || (z && new FeedPolicyAccessor(feedConnectJobInfo.getFeedPolicy()).continueOnHardwareFailure()))) {
            feedConnectJobInfo.getSourceFeedJoint().removeReceiver(connectionId);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Subscription " + feedConnectJobInfo.getConnectionId() + " completed successfully. Removed subscription");
            }
        }
        this.connectJobInfos.remove(connectionId);
        this.jobs.remove(Long.valueOf(feedConnectJobInfo.getJobId().getId()));
        notifyFeedEventSubscribers(z ? IActiveLifecycleEventSubscriber.ActiveLifecycleEvent.FEED_COLLECT_FAILURE : IActiveLifecycleEventSubscriber.ActiveLifecycleEvent.FEED_COLLECT_ENDED);
    }

    public List<String> getFeedStorageLocations(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getStorageLocations();
    }

    public List<String> getFeedCollectLocations(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getCollectLocations();
    }

    public List<String> getFeedIntakeLocations(EntityId entityId) {
        return this.entity2Intake.get(entityId).getIntakeLocation();
    }

    public JobId getFeedCollectJobId(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getJobId();
    }

    public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
        List list = this.feedPipeline.containsKey(feedJointKey.getFeedId()) ? (List) this.feedPipeline.get(feedJointKey.getFeedId()).second : null;
        if (list == null || list.isEmpty()) {
            return false;
        }
        Iterator it = list.iterator();
        while (it.hasNext()) {
            if (((IFeedJoint) it.next()).getFeedJointKey().equals(feedJointKey)) {
                return true;
            }
        }
        return false;
    }

    public Collection<IFeedJoint> getFeedIntakeJoints() {
        ArrayList arrayList = new ArrayList();
        Iterator<FeedIntakeInfo> it = this.entity2Intake.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getIntakeFeedJoint());
        }
        return arrayList;
    }

    public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
        List<IFeedJoint> list = this.feedPipeline.containsKey(feedJointKey.getFeedId()) ? (List) this.feedPipeline.get(feedJointKey.getFeedId()).second : null;
        if (list == null || list.isEmpty()) {
            return null;
        }
        for (IFeedJoint iFeedJoint : list) {
            if (iFeedJoint.getFeedJointKey().equals(feedJointKey)) {
                return iFeedJoint;
            }
        }
        return null;
    }

    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
        IFeedJoint feedJoint = getFeedJoint(feedJointKey);
        if (feedJoint != null) {
            return feedJoint;
        }
        String stringRep = feedJointKey.getStringRep();
        List<IFeedJoint> list = this.feedPipeline.containsKey(feedJointKey.getFeedId()) ? (List) this.feedPipeline.get(feedJointKey.getFeedId()).second : null;
        IFeedJoint iFeedJoint = null;
        if (list != null) {
            for (IFeedJoint iFeedJoint2 : list) {
                if (stringRep.contains(iFeedJoint2.getFeedJointKey().getStringRep()) && (iFeedJoint == null || iFeedJoint2.getFeedJointKey().getStringRep().contains(iFeedJoint.getFeedJointKey().getStringRep()))) {
                    iFeedJoint = iFeedJoint2;
                }
            }
        }
        return iFeedJoint;
    }

    public JobSpecification getCollectJobSpecification(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getSpec();
    }

    public IFeedJoint getFeedPoint(EntityId entityId, IFeedJoint.FeedJointType feedJointType) {
        for (IFeedJoint iFeedJoint : (List) this.feedPipeline.get(entityId).second) {
            if (iFeedJoint.getType().equals(feedJointType)) {
                return iFeedJoint;
            }
        }
        return null;
    }

    private void setLocations(FeedConnectJobInfo feedConnectJobInfo) {
        JobSpecification spec = feedConnectJobInfo.getSpec();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (Map.Entry entry : spec.getOperatorMap().entrySet()) {
            IOperatorDescriptor iOperatorDescriptor = (IOperatorDescriptor) entry.getValue();
            IOperatorDescriptor coreOperator = iOperatorDescriptor instanceof FeedMetaOperatorDescriptor ? ((FeedMetaOperatorDescriptor) iOperatorDescriptor).getCoreOperator() : iOperatorDescriptor;
            if (coreOperator instanceof AlgebricksMetaOperatorDescriptor) {
                AlgebricksMetaOperatorDescriptor algebricksMetaOperatorDescriptor = (AlgebricksMetaOperatorDescriptor) coreOperator;
                IPushRuntimeFactory[] runtimeFactories = algebricksMetaOperatorDescriptor.getPipeline().getRuntimeFactories();
                boolean z = false;
                int length = runtimeFactories.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    if (runtimeFactories[i] instanceof AssignRuntimeFactory) {
                        if (((IOperatorDescriptor) ((org.apache.commons.lang3.tuple.Pair) ((org.apache.commons.lang3.tuple.Pair) spec.getConnectorOperatorMap().get(((IConnectorDescriptor) ((List) spec.getOperatorInputMap().get(algebricksMetaOperatorDescriptor.getOperatorId())).get(0)).getConnectorId())).getLeft()).getLeft()) instanceof FeedCollectOperatorDescriptor) {
                            z = true;
                            break;
                        }
                    }
                    i++;
                }
                if (z) {
                    arrayList2.add(entry.getKey());
                }
            } else if (coreOperator instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
                arrayList3.add(entry.getKey());
            } else if (coreOperator instanceof FeedCollectOperatorDescriptor) {
                arrayList.add(entry.getKey());
            }
        }
        try {
            JobInfo jobInfo = AppContextInfo.INSTANCE.getHcc().getJobInfo(feedConnectJobInfo.getJobId());
            ArrayList arrayList4 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Map map = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it.next());
                int size = map.size();
                for (int i2 = 0; i2 < size; i2++) {
                    arrayList4.add(map.get(Integer.valueOf(i2)));
                }
            }
            ArrayList arrayList5 = new ArrayList();
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                Map map2 = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it2.next());
                if (map2 != null) {
                    int size2 = map2.size();
                    for (int i3 = 0; i3 < size2; i3++) {
                        arrayList5.add(map2.get(Integer.valueOf(i3)));
                    }
                } else {
                    arrayList5.clear();
                    arrayList5.addAll(arrayList4);
                }
            }
            ArrayList arrayList6 = new ArrayList();
            Iterator it3 = arrayList3.iterator();
            while (it3.hasNext()) {
                Map map3 = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it3.next());
                if (map3 != null) {
                    int size3 = map3.size();
                    for (int i4 = 0; i4 < size3; i4++) {
                        arrayList6.add(map3.get(Integer.valueOf(i4)));
                    }
                }
            }
            feedConnectJobInfo.setCollectLocations(arrayList4);
            feedConnectJobInfo.setComputeLocations(arrayList5);
            feedConnectJobInfo.setStorageLocations(arrayList6);
        } catch (Exception e) {
            LOGGER.error("Error while setting feed active locations", e);
        }
    }

    public synchronized void registerFeedEventSubscriber(IActiveLifecycleEventSubscriber iActiveLifecycleEventSubscriber) {
        this.subscribers.add(iActiveLifecycleEventSubscriber);
    }

    public void deregisterFeedEventSubscriber(IActiveLifecycleEventSubscriber iActiveLifecycleEventSubscriber) {
        this.subscribers.remove(iActiveLifecycleEventSubscriber);
    }

    public synchronized boolean isFeedConnectionActive(FeedConnectionId feedConnectionId, IActiveLifecycleEventSubscriber iActiveLifecycleEventSubscriber) {
        boolean z = false;
        FeedConnectJobInfo feedConnectJobInfo = this.connectJobInfos.get(feedConnectionId);
        if (feedConnectJobInfo != null) {
            z = feedConnectJobInfo.getState().equals(ActivityState.ACTIVE);
        }
        if (z) {
            registerFeedEventSubscriber(iActiveLifecycleEventSubscriber);
        }
        return z;
    }

    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId);
    }

    private void handleCollectJobStartMessage(FeedConnectJobInfo feedConnectJobInfo) throws ACIDException {
        setLocations(feedConnectJobInfo);
        for (IFeedJoint iFeedJoint : (List) this.feedPipeline.get(feedConnectJobInfo.getConnectionId().getFeedId()).second) {
            if (iFeedJoint.getProvider().equals(feedConnectJobInfo.getConnectionId())) {
                iFeedJoint.setState(IFeedJoint.State.ACTIVE);
                if (iFeedJoint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
                    feedConnectJobInfo.setComputeFeedJoint(iFeedJoint);
                }
            }
        }
        feedConnectJobInfo.setState(ActivityState.ACTIVE);
    }

    private synchronized boolean isConnectedToDataset(String str) {
        Iterator<FeedConnectionId> it = this.connectJobInfos.keySet().iterator();
        while (it.hasNext()) {
            if (it.next().getDatasetName().equals(str)) {
                return true;
            }
        }
        return false;
    }

    public FeedConnectionId[] getConnections() {
        return (FeedConnectionId[]) this.connectJobInfos.keySet().toArray(new FeedConnectionId[this.connectJobInfos.size()]);
    }

    public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
        return isFeedPointAvailable(feedJointKey);
    }

    public boolean isEntityActive() {
        return !this.jobs.isEmpty();
    }

    public EntityId getEntityId() {
        return this.entityId;
    }

    public IFeedJoint getSourceFeedJoint() {
        return this.sourceFeedJoint;
    }

    public boolean isEntityUsingDataset(String str, String str2) {
        return isConnectedToDataset(str2);
    }
}
