package org.apache.asterix.external.feed.dataflow;

import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.asterix.common.memory.FrameAction;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.class */
public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
    private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8d;
    private static final boolean DEBUG = false;
    private final FeedExceptionHandler exceptionHandler;
    private final FrameSpiller spiller;
    private final FeedPolicyAccessor fpa;
    private final FrameAction frameAction;
    private final int initialFrameSize;
    private final FrameTransporter consumer;
    private final Thread consumerThread;
    private final BlockingQueue<ByteBuffer> inbox;
    private final ConcurrentFramePool framePool;
    private FeedUtils.Mode mode = FeedUtils.Mode.PROCESS;
    private int total = 0;
    private int numDiscarded = 0;
    private int numSpilled = 0;
    private int numProcessedInMemory = 0;
    private int numStalled = 0;
    private static final Logger LOGGER = LogManager.getLogger();
    private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
    private static final ByteBuffer SPILLED = ByteBuffer.allocate(0);
    private static final ByteBuffer FAIL = ByteBuffer.allocate(0);

    /* loaded from: input_file:org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler$FrameTransporter.class */
    private class FrameTransporter implements Runnable {
        private volatile Throwable cause;
        private int consumed;

        private FrameTransporter() {
            this.consumed = 0;
        }

        public Throwable cause() {
            return this.cause;
        }

        private Throwable consume(ByteBuffer byteBuffer) {
            while (byteBuffer != null) {
                try {
                    FeedRuntimeInputHandler.this.writer.nextFrame(byteBuffer);
                    this.consumed++;
                    byteBuffer = null;
                } catch (HyracksDataException e) {
                    byteBuffer = FeedRuntimeInputHandler.this.exceptionHandler.handle(e, byteBuffer);
                    if (byteBuffer == null) {
                        this.cause = e;
                        return e;
                    }
                } catch (Throwable th) {
                    this.cause = th;
                    return th;
                }
            }
            return null;
        }

        private boolean clearLocalFrames() throws HyracksDataException {
            ByteBuffer next = FeedRuntimeInputHandler.this.spiller.next();
            while (true) {
                ByteBuffer byteBuffer = next;
                if (byteBuffer == null) {
                    return true;
                }
                if (consume(byteBuffer) != null) {
                    return false;
                }
                next = FeedRuntimeInputHandler.this.spiller.next();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            while (z) {
                try {
                    ByteBuffer byteBuffer = (ByteBuffer) FeedRuntimeInputHandler.this.inbox.poll();
                    if (byteBuffer == null) {
                        FeedRuntimeInputHandler.this.writer.flush();
                        byteBuffer = (ByteBuffer) FeedRuntimeInputHandler.this.inbox.take();
                    }
                    if (byteBuffer == FeedRuntimeInputHandler.SPILLED) {
                        z = clearLocalFrames();
                    } else if (byteBuffer == FeedRuntimeInputHandler.POISON_PILL) {
                        z = false;
                        if (FeedRuntimeInputHandler.this.spiller != null) {
                            clearLocalFrames();
                        }
                    } else if (byteBuffer == FeedRuntimeInputHandler.FAIL) {
                        z = false;
                        FeedRuntimeInputHandler.this.writer.fail();
                    } else {
                        try {
                            z = consume(byteBuffer) == null;
                            FeedRuntimeInputHandler.this.framePool.release(byteBuffer);
                        } catch (Throwable th) {
                            FeedRuntimeInputHandler.this.framePool.release(byteBuffer);
                            throw th;
                        }
                    }
                } catch (Throwable th2) {
                    this.cause = th2;
                    return;
                }
            }
        }

        public String toString() {
            return "consumed: " + this.consumed;
        }
    }

    public FeedRuntimeInputHandler(IHyracksTaskContext iHyracksTaskContext, FeedConnectionId feedConnectionId, ActiveRuntimeId activeRuntimeId, IFrameWriter iFrameWriter, FeedPolicyAccessor feedPolicyAccessor, FrameTupleAccessor frameTupleAccessor, ConcurrentFramePool concurrentFramePool) throws HyracksDataException {
        this.writer = iFrameWriter;
        this.spiller = feedPolicyAccessor.spillToDiskOnCongestion() ? new FrameSpiller(iHyracksTaskContext, feedConnectionId.getFeedId() + "_" + feedConnectionId.getDatasetName() + "_" + activeRuntimeId.getPartition(), feedPolicyAccessor.getMaxSpillOnDisk()) : null;
        this.exceptionHandler = new FeedExceptionHandler(iHyracksTaskContext, frameTupleAccessor);
        this.fpa = feedPolicyAccessor;
        this.framePool = concurrentFramePool;
        this.inbox = new LinkedBlockingQueue();
        this.consumer = new FrameTransporter();
        this.consumerThread = new Thread(this.consumer, "FeedRuntimeInputHandler-FrameTransporter");
        this.initialFrameSize = iHyracksTaskContext.getInitialFrameSize();
        this.frameAction = new FrameAction();
    }

    public void open() throws HyracksDataException {
        this.writer.open();
        this.consumerThread.start();
    }

    public void fail() throws HyracksDataException {
        ByteBuffer poll = this.inbox.poll();
        while (true) {
            ByteBuffer byteBuffer = poll;
            if (byteBuffer == null) {
                try {
                    this.inbox.put(FAIL);
                    return;
                } catch (InterruptedException e) {
                    LOGGER.log(Level.WARN, "interrupted", e);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            if (byteBuffer != SPILLED) {
                this.framePool.release(byteBuffer);
            }
            poll = this.inbox.poll();
        }
    }

    public void close() throws HyracksDataException {
        try {
            this.inbox.put(POISON_PILL);
            this.consumerThread.join();
        } catch (InterruptedException e) {
            LOGGER.log(Level.WARN, "interrupted", e);
            Thread.currentThread().interrupt();
        }
        try {
            if (this.spiller != null) {
                this.spiller.close();
            }
        } catch (Throwable th) {
            LOGGER.log(Level.WARN, "exception closing spiller", th);
        } finally {
            this.writer.close();
        }
    }

    public void nextFrame(ByteBuffer byteBuffer) throws HyracksDataException {
        try {
            this.total++;
            if (this.consumer.cause() != null) {
                throw this.consumer.cause();
            }
            switch (this.mode) {
                case PROCESS:
                    process(byteBuffer);
                    break;
                case SPILL:
                    spill(byteBuffer);
                    break;
                case DISCARD:
                    discard(byteBuffer);
                    break;
                default:
                    if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("Ignoring incoming tuples in " + this.mode + " mode");
                        break;
                    }
                    break;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw HyracksDataException.create(e);
        } catch (Throwable th) {
            throw HyracksDataException.create(th);
        }
    }

    public int framesOnDisk() {
        return this.spiller.remaining();
    }

    private ByteBuffer getFreeBuffer(int i) throws HyracksDataException {
        return i / this.initialFrameSize == 1 ? this.framePool.get() : this.framePool.get(i);
    }

    private void discard(ByteBuffer byteBuffer) throws HyracksDataException, InterruptedException {
        if (!this.fpa.spillToDiskOnCongestion()) {
            ByteBuffer freeBuffer = getFreeBuffer(byteBuffer.capacity());
            if (freeBuffer != null) {
                this.numProcessedInMemory++;
                freeBuffer.put(byteBuffer);
                this.inbox.put(freeBuffer);
                this.mode = FeedUtils.Mode.PROCESS;
                return;
            }
        } else if (this.spiller.spill(byteBuffer)) {
            this.numSpilled++;
            this.mode = FeedUtils.Mode.SPILL;
            return;
        }
        if ((this.numDiscarded + 1.0d) / this.total > this.fpa.getMaxFractionDiscard()) {
            stall(byteBuffer);
        } else {
            this.numDiscarded++;
        }
    }

    private void exitProcessState(ByteBuffer byteBuffer) throws HyracksDataException, InterruptedException {
        if (!this.fpa.spillToDiskOnCongestion()) {
            discardOrStall(byteBuffer);
            return;
        }
        this.mode = FeedUtils.Mode.SPILL;
        this.spiller.open();
        spill(byteBuffer);
    }

    private void discardOrStall(ByteBuffer byteBuffer) throws HyracksDataException, InterruptedException {
        if (!this.fpa.discardOnCongestion()) {
            stall(byteBuffer);
        } else {
            this.mode = FeedUtils.Mode.DISCARD;
            discard(byteBuffer);
        }
    }

    private void stall(ByteBuffer byteBuffer) throws HyracksDataException, InterruptedException {
        this.numStalled++;
        if (this.fpa.spillToDiskOnCongestion()) {
            waitforSpillSpace();
            this.spiller.spill(byteBuffer);
            this.numSpilled++;
            this.inbox.put(SPILLED);
            return;
        }
        this.frameAction.setFrame(byteBuffer);
        this.framePool.subscribe(this.frameAction);
        this.inbox.put(this.frameAction.retrieve());
        this.numProcessedInMemory++;
    }

    private void waitforSpillSpace() throws InterruptedException {
        synchronized (this.spiller) {
            while (this.spiller.usedBudget() > 0.8d) {
                this.spiller.wait();
            }
        }
    }

    private void process(ByteBuffer byteBuffer) throws HyracksDataException, InterruptedException {
        ByteBuffer freeBuffer = byteBuffer.capacity() <= this.framePool.getMaxFrameSize() ? getFreeBuffer(byteBuffer.capacity()) : null;
        if (freeBuffer == null) {
            exitProcessState(byteBuffer);
            return;
        }
        this.numProcessedInMemory++;
        freeBuffer.put(byteBuffer);
        this.inbox.put(freeBuffer);
    }

    private void spill(ByteBuffer byteBuffer) throws HyracksDataException, InterruptedException {
        if (!this.spiller.switchToMemory()) {
            if (this.spiller.spill(byteBuffer)) {
                this.inbox.put(SPILLED);
                this.numSpilled++;
                return;
            } else if (!this.fpa.discardOnCongestion()) {
                stall(byteBuffer);
                return;
            } else {
                this.mode = FeedUtils.Mode.DISCARD;
                discard(byteBuffer);
                return;
            }
        }
        ByteBuffer byteBuffer2 = null;
        if (byteBuffer.capacity() <= this.framePool.getMaxFrameSize()) {
            byteBuffer2 = getFreeBuffer(byteBuffer.capacity());
        }
        if (byteBuffer2 == null) {
            this.spiller.spill(byteBuffer);
            this.numSpilled++;
            this.inbox.put(SPILLED);
        } else {
            this.spiller.close();
            this.numProcessedInMemory++;
            byteBuffer2.put(byteBuffer);
            this.inbox.put(byteBuffer2);
            this.mode = FeedUtils.Mode.PROCESS;
        }
    }

    public int getNumDiscarded() {
        return this.numDiscarded;
    }

    public int getNumSpilled() {
        return this.numSpilled;
    }

    public int getNumProcessedInMemory() {
        return this.numProcessedInMemory;
    }

    public int getNumStalled() {
        return this.numStalled;
    }

    public int getTotal() {
        return this.total;
    }

    public BlockingQueue<ByteBuffer> getInternalBuffer() {
        return this.inbox;
    }
}
