package org.apache.asterix.external.operators;

import java.util.Map;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

/* loaded from: input_file:org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.class */
public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
    private final int partition;
    private final FeedConnectionId connectionId;
    private final Map<String, String> feedPolicy;
    private final FeedPolicyAccessor policyAccessor;
    private final ActiveManager activeManager;
    private final ISubscribableRuntime sourceRuntime;
    private final IHyracksTaskContext ctx;
    private CollectionRuntime collectRuntime;

    public FeedCollectOperatorNodePushable(IHyracksTaskContext iHyracksTaskContext, FeedConnectionId feedConnectionId, Map<String, String> map, int i, ISubscribableRuntime iSubscribableRuntime) {
        this.ctx = iHyracksTaskContext;
        this.partition = i;
        this.connectionId = feedConnectionId;
        this.sourceRuntime = iSubscribableRuntime;
        this.feedPolicy = map;
        this.policyAccessor = new FeedPolicyAccessor(map);
        this.activeManager = (ActiveManager) ((IAppRuntimeContext) iHyracksTaskContext.getJobletContext().getApplicationContext().getApplicationObject()).getActiveManager();
    }

    public void initialize() throws HyracksDataException {
        try {
            ActiveRuntimeId activeRuntimeId = new ActiveRuntimeId(this.connectionId.getFeedId(), FeedUtils.FeedRuntimeType.COLLECT.toString(), this.partition);
            FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(this.recordDesc);
            if (this.policyAccessor.bufferingEnabled()) {
                this.writer = new FeedRuntimeInputHandler(this.ctx, this.connectionId, activeRuntimeId, this.writer, this.policyAccessor, frameTupleAccessor, this.activeManager.getFramePool());
            } else {
                this.writer = new SyncFeedRuntimeInputHandler(this.ctx, this.writer, frameTupleAccessor);
            }
            this.collectRuntime = new CollectionRuntime(this.connectionId, activeRuntimeId, this.sourceRuntime, this.feedPolicy, this.ctx, new FeedFrameCollector(this.policyAccessor, this.writer, this.connectionId));
            this.activeManager.registerRuntime(this.collectRuntime);
            this.sourceRuntime.subscribe(this.collectRuntime);
            this.ctx.sendApplicationMessageToCC(new ActivePartitionMessage(activeRuntimeId, this.ctx.getJobletContext().getJobId(), (byte) 0), (DeploymentId) null);
            this.collectRuntime.waitTillCollectionOver();
            this.activeManager.deregisterRuntime(this.collectRuntime.getRuntimeId());
        } catch (Exception e) {
            throw new HyracksDataException(e);
        }
    }
}
