package org.apache.asterix.external.operators;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;

/* loaded from: input_file:org/apache/asterix/external/operators/FeedMetaComputeNodePushable.class */
public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
    private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
    private FeedPolicyEnforcer policyEnforcer;
    private FeedConnectionId connectionId;
    private int partition;
    private ActiveManager feedManager;
    private FrameTupleAccessor fta;
    private final IHyracksTaskContext ctx;
    private final FeedUtils.FeedRuntimeType runtimeType = FeedUtils.FeedRuntimeType.COMPUTE;
    private final VSizeFrame message;
    private final FeedMetaOperatorDescriptor opDesc;
    private final IRecordDescriptorProvider recordDescProvider;
    private boolean opened;

    public FeedMetaComputeNodePushable(IHyracksTaskContext iHyracksTaskContext, IRecordDescriptorProvider iRecordDescriptorProvider, int i, int i2, IOperatorDescriptor iOperatorDescriptor, FeedConnectionId feedConnectionId, Map<String, String> map, String str, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor) throws HyracksDataException {
        this.ctx = iHyracksTaskContext;
        this.coreOperator = ((IActivity) iOperatorDescriptor).createPushRuntime(iHyracksTaskContext, iRecordDescriptorProvider, i, i2);
        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, map);
        this.partition = i;
        this.connectionId = feedConnectionId;
        this.feedManager = (ActiveManager) ((IAppRuntimeContext) iHyracksTaskContext.getJobletContext().getApplicationContext().getApplicationObject()).getActiveManager();
        this.message = new VSizeFrame(iHyracksTaskContext);
        TaskUtils.putInSharedMap("HYX:MSG", this.message, iHyracksTaskContext);
        this.opDesc = feedMetaOperatorDescriptor;
        this.recordDescProvider = iRecordDescriptorProvider;
    }

    public void open() throws HyracksDataException {
        try {
            initializeNewFeedRuntime(new ActiveRuntimeId(this.connectionId.getFeedId(), this.runtimeType.toString(), this.partition));
            this.opened = true;
            this.writer.open();
        } catch (Exception e) {
            e.printStackTrace();
            throw new HyracksDataException(e);
        }
    }

    private void initializeNewFeedRuntime(ActiveRuntimeId activeRuntimeId) throws Exception {
        this.fta = new FrameTupleAccessor(this.recordDescProvider.getInputRecordDescriptor(this.opDesc.getActivityId(), 0));
        FeedPolicyAccessor feedPolicyAccessor = this.policyEnforcer.getFeedPolicyAccessor();
        this.coreOperator.setOutputFrameWriter(0, this.writer, this.recordDesc);
        if (feedPolicyAccessor.bufferingEnabled()) {
            this.writer = new FeedRuntimeInputHandler(this.ctx, this.connectionId, activeRuntimeId, this.coreOperator, feedPolicyAccessor, this.fta, this.feedManager.getFramePool());
        } else {
            this.writer = new SyncFeedRuntimeInputHandler(this.ctx, this.coreOperator, this.fta);
        }
    }

    public void nextFrame(ByteBuffer byteBuffer) throws HyracksDataException {
        try {
            FeedUtils.processFeedMessage(byteBuffer, this.message, this.fta);
            this.writer.nextFrame(byteBuffer);
        } catch (Exception e) {
            LOGGER.log(Level.WARNING, e.getMessage(), (Throwable) e);
            throw new HyracksDataException(e);
        }
    }

    public void fail() throws HyracksDataException {
        this.writer.fail();
    }

    public void close() throws HyracksDataException {
        if (this.opened) {
            this.writer.close();
        }
    }

    public void flush() throws HyracksDataException {
        this.writer.flush();
    }
}
