package org.apache.asterix.external.feed.test;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.test.FrameWriterTestUtils;
import org.apache.hyracks.api.test.TestControlledFrameWriter;
import org.apache.hyracks.api.test.TestFrameWriter;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.test.support.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/asterix/external/feed/test/InputHandlerTest.class */
public class InputHandlerTest {
    private static final int DEFAULT_FRAME_SIZE = 32768;
    private static final int NUM_FRAMES = 128;
    private static final long FEED_MEM_BUDGET = 4194304;
    private static final String DATAVERSE = "dataverse";
    private static final String DATASET = "dataset";
    private static final String FEED = "feed";
    private static final String NODE_ID = "NodeId";
    private static final float DISCARD_ALLOWANCE = 0.15f;
    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
    private static volatile HyracksDataException cause = null;

    /* loaded from: input_file:org/apache/asterix/external/feed/test/InputHandlerTest$Pusher.class */
    private class Pusher implements Runnable {
        private final ByteBuffer buffer;
        private final IFrameWriter writer;

        public Pusher(ByteBuffer byteBuffer, IFrameWriter iFrameWriter) {
            this.buffer = byteBuffer;
            this.writer = iFrameWriter;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.writer.nextFrame(this.buffer);
            } catch (HyracksDataException e) {
                e.printStackTrace();
                HyracksDataException unused = InputHandlerTest.cause = e;
            }
        }
    }

    private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext iHyracksTaskContext, IFrameWriter iFrameWriter, FeedPolicyAccessor feedPolicyAccessor, ConcurrentFramePool concurrentFramePool) throws HyracksDataException {
        FrameTupleAccessor frameTupleAccessor = (FrameTupleAccessor) Mockito.mock(FrameTupleAccessor.class);
        EntityId entityId = new EntityId("Feed", DATAVERSE, FEED);
        return new FeedRuntimeInputHandler(iHyracksTaskContext, new FeedConnectionId(entityId, DATASET), new ActiveRuntimeId(entityId, FeedUtils.FeedRuntimeType.COLLECT.toString(), 0), iFrameWriter, feedPolicyAccessor, frameTupleAccessor, concurrentFramePool);
    }

    private static FeedPolicyAccessor createFeedPolicyAccessor(boolean z, boolean z2, long j, float f) {
        FeedPolicyAccessor feedPolicyAccessor = (FeedPolicyAccessor) Mockito.mock(FeedPolicyAccessor.class);
        Mockito.when(Boolean.valueOf(feedPolicyAccessor.flowControlEnabled())).thenReturn(true);
        Mockito.when(Boolean.valueOf(feedPolicyAccessor.spillToDiskOnCongestion())).thenReturn(Boolean.valueOf(z));
        Mockito.when(Long.valueOf(feedPolicyAccessor.getMaxSpillOnDisk())).thenReturn(Long.valueOf(j));
        Mockito.when(Boolean.valueOf(feedPolicyAccessor.discardOnCongestion())).thenReturn(Boolean.valueOf(z2));
        Mockito.when(Float.valueOf(feedPolicyAccessor.getMaxFractionDiscard())).thenReturn(Float.valueOf(f));
        return feedPolicyAccessor;
    }

    private void cleanDiskFiles() throws IOException {
        Iterator it = FileUtils.listFiles(new File("."), new WildcardFileFilter("dataverse.feed(Feed)_dataset*"), (IOFileFilter) null).iterator();
        while (it.hasNext()) {
            Files.deleteIfExists(((File) it.next()).toPath());
        }
    }

    @Before
    public void testCleanBefore() throws IOException {
        cleanDiskFiles();
    }

    @After
    public void testCleanAfter() throws IOException {
        cleanDiskFiles();
    }

    @Test
    public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() {
        try {
            try {
                Random random = new Random();
                IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
                FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(true, false, FEED_MEM_BUDGET, DISCARD_ALLOWANCE);
                TestFrameWriter create2 = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
                FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, 0L, DEFAULT_FRAME_SIZE));
                createInputHandler.open();
                createInputHandler.nextFrame(ByteBuffer.allocate(DEFAULT_FRAME_SIZE));
                Assert.assertEquals(0L, createInputHandler.getNumProcessedInMemory());
                Assert.assertEquals(1L, createInputHandler.getNumSpilled());
                for (int i = 0; i < NUM_FRAMES * 5; i++) {
                    createInputHandler.nextFrame(ByteBuffer.allocate(DEFAULT_FRAME_SIZE * (random.nextInt(10) + 1)));
                }
                Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
                Assert.assertEquals((NUM_FRAMES * 5) + 1, createInputHandler.getNumSpilled());
                create2.validate(false);
                createInputHandler.close();
                Assert.assertEquals((NUM_FRAMES * 5) + 1, create2.nextFrameCount());
                create2.validate(true);
                Assert.assertNull(cause);
            } catch (Throwable th) {
                th.printStackTrace();
                Assert.fail();
                Assert.assertNull(cause);
            }
        } catch (Throwable th2) {
            Assert.assertNull(cause);
            throw th2;
        }
    }

    @Test
    public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
        try {
            try {
                IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
                FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(true, false, FEED_MEM_BUDGET, DISCARD_ALLOWANCE);
                TestFrameWriter create2 = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
                FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, 0L, DEFAULT_FRAME_SIZE));
                createInputHandler.open();
                VSizeFrame vSizeFrame = new VSizeFrame(create);
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
                Assert.assertEquals(0L, createInputHandler.getNumProcessedInMemory());
                Assert.assertEquals(1L, createInputHandler.getNumSpilled());
                for (int i = 0; i < NUM_FRAMES * 10; i++) {
                    createInputHandler.nextFrame(vSizeFrame.getBuffer());
                }
                Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
                Assert.assertEquals((NUM_FRAMES * 10) + 1, createInputHandler.getNumSpilled());
                create2.validate(false);
                createInputHandler.close();
                Assert.assertEquals((NUM_FRAMES * 10) + 1, create2.nextFrameCount());
                create2.validate(true);
                Assert.assertNull(cause);
            } catch (Throwable th) {
                th.printStackTrace();
                Assert.fail();
                Assert.assertNull(cause);
            }
        } catch (Throwable th2) {
            Assert.assertNull(cause);
            throw th2;
        }
    }

    @Test
    public void testMemoryVarSizeFrameWithSpillWithDiscard() {
        try {
            int i = 0;
            int i2 = 0;
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * 50, DISCARD_ALLOWANCE);
            TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
            create2.freeze();
            ConcurrentFramePool concurrentFramePool = new ConcurrentFramePool(NODE_ID, 50 * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, concurrentFramePool);
            createInputHandler.open();
            ByteBuffer allocate = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
            ByteBuffer allocate2 = ByteBuffer.allocate(65536);
            ByteBuffer allocate3 = ByteBuffer.allocate(98304);
            ByteBuffer allocate4 = ByteBuffer.allocate(131072);
            ByteBuffer allocate5 = ByteBuffer.allocate(163840);
            while (i2 + 1 < 50) {
                createInputHandler.nextFrame(allocate);
                i++;
                int i3 = i2 + 1;
                if (i3 + 2 >= 50) {
                    break;
                }
                i++;
                int i4 = i3 + 2;
                createInputHandler.nextFrame(allocate2);
                if (i4 + 3 >= 50) {
                    break;
                }
                i++;
                i2 = i4 + 3;
                createInputHandler.nextFrame(allocate3);
            }
            Assert.assertTrue(concurrentFramePool.remaining() < 3);
            Assert.assertEquals(0L, createInputHandler.getNumSpilled());
            Assert.assertEquals(0L, createInputHandler.getNumStalled());
            Assert.assertEquals(0L, createInputHandler.getNumDiscarded());
            while (createInputHandler.getNumSpilled() < 50) {
                i++;
                createInputHandler.nextFrame(allocate3);
                if (createInputHandler.getNumSpilled() >= 50) {
                    break;
                }
                i++;
                createInputHandler.nextFrame(allocate4);
                if (createInputHandler.getNumSpilled() >= 50) {
                    break;
                }
                i++;
                createInputHandler.nextFrame(allocate5);
            }
            Assert.assertTrue(concurrentFramePool.remaining() < 3);
            Assert.assertEquals(createInputHandler.framesOnDisk(), createInputHandler.getNumSpilled());
            Assert.assertEquals(createInputHandler.framesOnDisk(), 50);
            Assert.assertEquals(0L, createInputHandler.getNumStalled());
            Assert.assertEquals(0L, createInputHandler.getNumDiscarded());
            double d = 0.0d;
            boolean z = (0.0d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            while (z) {
                createInputHandler.nextFrame(allocate5);
                d += 1.0d;
                z = (d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            }
            Assert.assertTrue(concurrentFramePool.remaining() < 3);
            Assert.assertEquals(createInputHandler.framesOnDisk(), createInputHandler.getNumSpilled());
            Assert.assertEquals(0L, createInputHandler.getNumStalled());
            Assert.assertEquals((int) d, createInputHandler.getNumDiscarded());
            Future<?> submit = EXECUTOR.submit(new Pusher(allocate5, createInputHandler));
            if (submit.isDone()) {
                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
            }
            create2.unfreeze();
            submit.get();
            createInputHandler.close();
            Assert.assertEquals(create2.nextFrameCount(), i + 1);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
        try {
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * 50, DISCARD_ALLOWANCE);
            TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
            create2.freeze();
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, 50 * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE));
            createInputHandler.open();
            VSizeFrame vSizeFrame = new VSizeFrame(create);
            for (int i = 0; i < 50; i++) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
            }
            Assert.assertEquals(0L, r0.remaining());
            Assert.assertEquals(50, createInputHandler.getTotal());
            Assert.assertEquals(0L, createInputHandler.getNumSpilled());
            Assert.assertEquals(0L, createInputHandler.getNumStalled());
            Assert.assertEquals(0L, createInputHandler.getNumDiscarded());
            for (int i2 = 0; i2 < 50; i2++) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
            }
            Assert.assertEquals(0L, r0.remaining());
            Assert.assertEquals(50 + 50, createInputHandler.getTotal());
            Assert.assertEquals(50, createInputHandler.getNumSpilled());
            Assert.assertEquals(0L, createInputHandler.getNumStalled());
            Assert.assertEquals(0L, createInputHandler.getNumDiscarded());
            double d = 0.0d;
            boolean z = (0.0d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            while (z) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
                d += 1.0d;
                z = (d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            }
            Assert.assertEquals(0L, r0.remaining());
            Assert.assertEquals((int) (50 + 50 + d), createInputHandler.getTotal());
            Assert.assertEquals(50, createInputHandler.getNumSpilled());
            Assert.assertEquals(0L, createInputHandler.getNumStalled());
            Assert.assertEquals((int) d, createInputHandler.getNumDiscarded());
            Future<?> submit = EXECUTOR.submit(new Pusher(vSizeFrame.getBuffer(), createInputHandler));
            if (submit.isDone()) {
                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
            } else {
                Assert.assertEquals((int) d, createInputHandler.getNumDiscarded());
            }
            create2.unfreeze();
            submit.get();
            createInputHandler.close();
            Assert.assertTrue(submit.isDone());
            Assert.assertEquals(create2.nextFrameCount(), 50 + 50 + 1);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
        try {
            Random random = new Random();
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(false, true, 32768L, DISCARD_ALLOWANCE);
            TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
            create2.freeze();
            ConcurrentFramePool concurrentFramePool = new ConcurrentFramePool(NODE_ID, 100 * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, concurrentFramePool);
            createInputHandler.open();
            ByteBuffer allocate = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
            int i = 1;
            int i2 = 0;
            while (i <= concurrentFramePool.remaining()) {
                i2++;
                createInputHandler.nextFrame(allocate);
                i = random.nextInt(10) + 1;
                allocate = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * i);
            }
            double d = 0.0d;
            boolean z = (0.0d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            while (z) {
                createInputHandler.nextFrame(allocate);
                d += 1.0d;
                z = (d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            }
            Future<?> submit = EXECUTOR.submit(new Pusher(allocate, createInputHandler));
            if (submit.isDone()) {
                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
            } else {
                Assert.assertEquals((int) d, createInputHandler.getNumDiscarded());
                Assert.assertEquals(createInputHandler.getNumSpilled(), 0L);
            }
            create2.unfreeze();
            submit.get();
            createInputHandler.close();
            Assert.assertEquals(create2.nextFrameCount(), i2 + 1);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
        try {
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(false, true, 32768L, DISCARD_ALLOWANCE);
            TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
            create2.freeze();
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, 100 * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE));
            createInputHandler.open();
            VSizeFrame vSizeFrame = new VSizeFrame(create);
            for (int i = 0; i < 100; i++) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
            }
            double d = 0.0d;
            boolean z = (0.0d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            while (z) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
                d += 1.0d;
                z = (d + 1.0d) / (((double) createInputHandler.getTotal()) + 1.0d) <= ((double) createFeedPolicyAccessor.getMaxFractionDiscard());
            }
            Future<?> submit = EXECUTOR.submit(new Pusher(vSizeFrame.getBuffer(), createInputHandler));
            if (submit.isDone()) {
                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
            } else {
                Assert.assertEquals((int) d, createInputHandler.getNumDiscarded());
                Assert.assertEquals(createInputHandler.getNumSpilled(), 0L);
            }
            create2.unfreeze();
            submit.get();
            createInputHandler.close();
            Assert.assertEquals(create2.nextFrameCount(), 100 + 1);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
        try {
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(true, false, FEED_MEM_BUDGET, DISCARD_ALLOWANCE);
            TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
            create2.freeze();
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE));
            createInputHandler.open();
            VSizeFrame vSizeFrame = new VSizeFrame(create);
            for (int i = 0; i < NUM_FRAMES; i++) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
            }
            EXECUTOR.submit(new Pusher(vSizeFrame.getBuffer(), createInputHandler)).get();
            Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
            Assert.assertEquals(createInputHandler.getNumSpilled(), 1L);
            create2.unfreeze();
            createInputHandler.close();
            Assert.assertEquals(createInputHandler.framesOnDisk(), 0L);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
        try {
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
            TestFrameWriter create2 = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE));
            createInputHandler.open();
            VSizeFrame vSizeFrame = new VSizeFrame(create);
            for (int i = 0; i < NUM_FRAMES * 10; i++) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
            }
            Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
            Assert.assertEquals(createInputHandler.getNumSpilled(), 0L);
            create2.validate(false);
            createInputHandler.close();
            Assert.assertEquals(NUM_FRAMES * 10, create2.nextFrameCount());
            create2.validate(true);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
        try {
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
            TestFrameWriter create2 = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE));
            createInputHandler.open();
            VSizeFrame vSizeFrame = new VSizeFrame(create);
            create2.setNextDuration(1L);
            for (int i = 0; i < NUM_FRAMES * 10; i++) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
            }
            Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
            Assert.assertEquals(createInputHandler.getNumSpilled(), 0L);
            create2.validate(false);
            createInputHandler.close();
            Assert.assertEquals(create2.nextFrameCount(), NUM_FRAMES * 10);
            create2.validate(true);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryVarSizeFrameNoDiskNoDiscard() {
        try {
            Random random = new Random();
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
            TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
            create2.freeze();
            ConcurrentFramePool concurrentFramePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, concurrentFramePool);
            createInputHandler.open();
            ByteBuffer allocate = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
            int i = 1;
            while (i <= concurrentFramePool.remaining()) {
                createInputHandler.nextFrame(allocate);
                i = random.nextInt(10) + 1;
                allocate = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * i);
            }
            Future<?> submit = EXECUTOR.submit(new Pusher(allocate, createInputHandler));
            if (submit.isDone()) {
                Assert.fail();
            }
            Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
            Assert.assertEquals(createInputHandler.getNumSpilled(), 0L);
            Assert.assertTrue(createInputHandler.getNumStalled() <= 1);
            create2.unfreeze();
            createInputHandler.close();
            submit.get();
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryVarSizeFrameWithSpillNoDiscard() {
        for (int i = 0; i < 1000; i++) {
            try {
                Random random = new Random();
                IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
                FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(true, false, FEED_MEM_BUDGET, DISCARD_ALLOWANCE);
                TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
                create2.freeze();
                ConcurrentFramePool concurrentFramePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
                FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, concurrentFramePool);
                createInputHandler.open();
                ByteBuffer allocate = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
                int i2 = 1;
                int i3 = 0;
                while (i2 <= concurrentFramePool.remaining()) {
                    i3++;
                    createInputHandler.nextFrame(allocate);
                    i2 = random.nextInt(10) + 1;
                    allocate = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * i2);
                }
                EXECUTOR.submit(new Pusher(allocate, createInputHandler)).get();
                Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
                Assert.assertEquals(createInputHandler.getNumSpilled(), 1L);
                while (i3 > 1) {
                    create2.kick();
                    i3--;
                }
                Assert.assertEquals(1L, createInputHandler.framesOnDisk());
                create2.unfreeze();
                createInputHandler.close();
                Assert.assertEquals(0L, createInputHandler.framesOnDisk());
            } catch (Throwable th) {
                th.printStackTrace();
                Assert.fail();
            }
        }
        Assert.assertNull(cause);
    }

    @Test
    public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
        try {
            IHyracksTaskContext create = TestUtils.create(DEFAULT_FRAME_SIZE);
            FeedPolicyAccessor createFeedPolicyAccessor = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
            TestControlledFrameWriter create2 = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
            create2.freeze();
            FeedRuntimeInputHandler createInputHandler = createInputHandler(create, create2, createFeedPolicyAccessor, new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE));
            createInputHandler.open();
            VSizeFrame vSizeFrame = new VSizeFrame(create);
            for (int i = 0; i < NUM_FRAMES; i++) {
                createInputHandler.nextFrame(vSizeFrame.getBuffer());
            }
            Future<?> submit = EXECUTOR.submit(new Pusher(vSizeFrame.getBuffer(), createInputHandler));
            if (submit.isDone()) {
                Assert.fail();
            } else {
                Assert.assertEquals(createInputHandler.getNumDiscarded(), 0L);
                Assert.assertEquals(createInputHandler.getNumSpilled(), 0L);
                Assert.assertTrue(createInputHandler.getNumStalled() <= 1);
                create2.kick();
            }
            submit.get();
            create2.unfreeze();
            createInputHandler.close();
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail();
        }
        Assert.assertNull(cause);
    }
}
