package org.apache.asterix.external.operators;

import java.io.Serializable;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
import org.apache.asterix.external.feed.runtime.IngestionRuntime;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

/* loaded from: input_file:org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.class */
public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
    private final EntityId feedId;
    private final int partition;
    private final IHyracksTaskContext ctx;
    private final IAdapterFactory adapterFactory;
    private final FeedIntakeOperatorDescriptor opDesc;

    public FeedIntakeOperatorNodePushable(IHyracksTaskContext iHyracksTaskContext, EntityId entityId, IAdapterFactory iAdapterFactory, int i, FeedPolicyAccessor feedPolicyAccessor, IRecordDescriptorProvider iRecordDescriptorProvider, FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
        this.opDesc = feedIntakeOperatorDescriptor;
        this.recordDesc = iRecordDescriptorProvider.getOutputRecordDescriptor(this.opDesc.getActivityId(), 0);
        this.ctx = iHyracksTaskContext;
        this.feedId = entityId;
        this.partition = i;
        this.adapterFactory = iAdapterFactory;
    }

    public void initialize() throws HyracksDataException {
        ActiveManager activeManager = (ActiveManager) ((IAppRuntimeContext) this.ctx.getJobletContext().getApplicationContext().getApplicationObject()).getActiveManager();
        DistributeFeedFrameWriter distributeFeedFrameWriter = null;
        IngestionRuntime ingestionRuntime = null;
        try {
            try {
                Thread.currentThread().setName("Intake Thread");
                FeedAdapter feedAdapter = (FeedAdapter) this.adapterFactory.createAdapter(this.ctx, this.partition);
                DistributeFeedFrameWriter distributeFeedFrameWriter2 = new DistributeFeedFrameWriter(this.feedId, this.writer, FeedUtils.FeedRuntimeType.INTAKE, this.partition);
                AdapterRuntimeManager adapterRuntimeManager = new AdapterRuntimeManager(this.ctx, this.feedId, feedAdapter, distributeFeedFrameWriter2, this.partition);
                ActiveRuntimeId activeRuntimeId = new ActiveRuntimeId(this.feedId, FeedUtils.FeedRuntimeType.INTAKE.toString(), this.partition);
                IngestionRuntime ingestionRuntime2 = new IngestionRuntime(this.feedId, activeRuntimeId, distributeFeedFrameWriter2, adapterRuntimeManager, this.ctx);
                activeManager.registerRuntime(ingestionRuntime2);
                this.ctx.sendApplicationMessageToCC(new ActivePartitionMessage(activeRuntimeId, this.ctx.getJobletContext().getJobId(), (byte) 0), (DeploymentId) null);
                distributeFeedFrameWriter2.open();
                synchronized (adapterRuntimeManager) {
                    while (!adapterRuntimeManager.isDone()) {
                        adapterRuntimeManager.wait();
                    }
                }
                activeManager.deregisterRuntime(ingestionRuntime2.getRuntimeId());
                if (adapterRuntimeManager.isFailed()) {
                    throw new RuntimeDataException(3008, new Serializable[0]);
                }
                if (1 != 0) {
                    distributeFeedFrameWriter2.close();
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    ingestionRuntime.terminate();
                    activeManager.deregisterRuntime(ingestionRuntime.getRuntimeId());
                }
                throw new HyracksDataException(th);
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                distributeFeedFrameWriter.close();
            }
            throw th2;
        }
    }
}
