package org.apache.asterix.external.feed.runtime;

import java.util.logging.Level;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveRuntime;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.util.TaskUtils;

/* loaded from: input_file:org/apache/asterix/external/feed/runtime/IngestionRuntime.class */
public class IngestionRuntime extends SubscribableRuntime {
    private final AdapterRuntimeManager adapterRuntimeManager;
    private final IHyracksTaskContext ctx;
    private int numSubscribers;

    public IngestionRuntime(EntityId entityId, ActiveRuntimeId activeRuntimeId, DistributeFeedFrameWriter distributeFeedFrameWriter, AdapterRuntimeManager adapterRuntimeManager, IHyracksTaskContext iHyracksTaskContext) {
        super(entityId, activeRuntimeId, distributeFeedFrameWriter);
        this.numSubscribers = 0;
        this.adapterRuntimeManager = adapterRuntimeManager;
        this.ctx = iHyracksTaskContext;
    }

    @Override // org.apache.asterix.external.feed.api.ISubscribableRuntime
    public synchronized void subscribe(CollectionRuntime collectionRuntime) throws HyracksDataException {
        this.dWriter.subscribe(collectionRuntime.getFrameCollector());
        this.subscribers.add(collectionRuntime);
        if (this.numSubscribers == 0) {
            TaskUtils.putInSharedMap("HYX:MSG", new VSizeFrame(this.ctx), this.ctx);
            TaskUtils.putInSharedMap("HYX:MSG", TaskUtils.get("HYX:MSG", this.ctx), collectionRuntime.getCtx());
            start();
        }
        this.numSubscribers++;
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
        }
    }

    @Override // org.apache.asterix.external.feed.api.ISubscribableRuntime
    public synchronized void unsubscribe(CollectionRuntime collectionRuntime) throws InterruptedException {
        this.numSubscribers--;
        if (this.numSubscribers == 0) {
            stop();
        }
        this.subscribers.remove(collectionRuntime);
    }

    public AdapterRuntimeManager getAdapterRuntimeManager() {
        return this.adapterRuntimeManager;
    }

    public void terminate() {
        for (IActiveRuntime iActiveRuntime : this.subscribers) {
            try {
                unsubscribe((CollectionRuntime) iActiveRuntime);
            } catch (Exception e) {
                if (LOGGER.isLoggable(Level.WARNING)) {
                    LOGGER.warning("Excpetion in unsubscribing " + iActiveRuntime + " message " + e.getMessage());
                }
            }
        }
    }

    public void start() {
        this.adapterRuntimeManager.start();
    }

    public void stop() throws InterruptedException {
        this.adapterRuntimeManager.stop();
    }
}
