package org.apache.asterix.external.operators;

import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.class */
public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
    private static final Logger LOGGER = LogManager.getLogger();
    public static final int DEFAULT_ABORT_TIMEOUT = 60000;
    private final FeedIntakeOperatorDescriptor opDesc;
    private final FeedAdapter adapter;
    private boolean poisoned;

    public FeedIntakeOperatorNodePushable(IHyracksTaskContext iHyracksTaskContext, EntityId entityId, IAdapterFactory iAdapterFactory, int i, IRecordDescriptorProvider iRecordDescriptorProvider, FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws HyracksDataException {
        super(iHyracksTaskContext, new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), i));
        this.poisoned = false;
        this.opDesc = feedIntakeOperatorDescriptor;
        this.recordDesc = iRecordDescriptorProvider.getOutputRecordDescriptor(this.opDesc.getActivityId(), 0);
        this.adapter = (FeedAdapter) iAdapterFactory.createAdapter(iHyracksTaskContext, this.runtimeId.getPartition());
    }

    protected void start() throws HyracksDataException, InterruptedException {
        Throwable close;
        Thread.currentThread().setName("Intake Thread");
        try {
            try {
                this.writer.open();
            } catch (Throwable th) {
                CleanupUtils.fail(this.writer, th);
                LOGGER.log(Level.WARN, "Failure during data ingestion", th);
                close = CleanupUtils.close(this.writer, CleanupUtils.close(this.adapter, th));
            }
            synchronized (this) {
                if (this.poisoned) {
                    return;
                }
                VSizeFrame vSizeFrame = new VSizeFrame(this.ctx);
                TaskUtil.put("HYX:MSG", vSizeFrame, this.ctx);
                vSizeFrame.getBuffer().put((byte) 1);
                vSizeFrame.getBuffer().flip();
                run();
                close = CleanupUtils.close(this.writer, CleanupUtils.close(this.adapter, (Throwable) null));
                if (close != null) {
                    throw HyracksDataException.create(close);
                }
            }
        } finally {
            CleanupUtils.close(this.writer, CleanupUtils.close(this.adapter, (Throwable) null));
        }
    }

    private void run() throws HyracksDataException {
        LOGGER.info("Starting ingestion for partition:" + this.ctx.getTaskAttemptId().getTaskId().getPartition());
        try {
            doRun();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw HyracksDataException.create(e);
        } catch (Exception e2) {
            LOGGER.log(Level.WARN, "Unhandled Exception", e2);
            throw HyracksDataException.create(e2);
        }
    }

    private void doRun() throws HyracksDataException, InterruptedException {
        try {
            this.adapter.start(this.ctx.getTaskAttemptId().getTaskId().getPartition(), this.writer);
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            LOGGER.log(Level.WARN, "Exception during feed ingestion ", e2);
            throw HyracksDataException.create(e2);
        }
    }

    protected void abort() throws HyracksDataException, InterruptedException {
        LOGGER.info(this.runtimeId + " aborting...");
        synchronized (this) {
            this.poisoned = true;
            try {
                if (!this.adapter.stop(60000L)) {
                    LOGGER.info(this.runtimeId + " failed to stop adapter. interrupting the thread...");
                    this.taskThread.interrupt();
                }
            } catch (HyracksDataException e) {
                if (e.getComponent() != "HYR" || e.getErrorCode() != 91) {
                    LOGGER.log(Level.WARN, "Failure during attempt to stop " + this.runtimeId, e);
                    throw e;
                }
                LOGGER.log(Level.WARN, this.runtimeId + " stop adapter timed out. interrupting the thread...", e);
                this.taskThread.interrupt();
            }
        }
    }

    public String getStats() {
        return this.adapter != null ? "{\"adapter-stats\": " + this.adapter.getStats() + "}" : "\"Runtime stats is not available.\"";
    }
}
