package org.apache.streams.local.queues;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import java.lang.management.ManagementFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.class */
public class ThroughputQueueMulitThreadTest extends RandomizedTest {
    private static final String MBEAN_ID = "testQueue";
    private static final String STREAM_ID = "test_stream";
    private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMulitThreadTest.class);
    private static long STREAM_START_TIME = new DateTime().getMillis();

    /* loaded from: input_file:org/apache/streams/local/queues/ThroughputQueueMulitThreadTest$BlocksOnEmptyQueue.class */
    private class BlocksOnEmptyQueue implements Runnable {
        private CountDownLatch full;
        private volatile boolean complete = false;
        private int queueSize;
        private CountDownLatch finished;
        private BlockingQueue queue;

        public BlocksOnEmptyQueue(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, int i, BlockingQueue blockingQueue) {
            this.full = countDownLatch;
            this.finished = countDownLatch2;
            this.queueSize = i;
            this.queue = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 0; i < this.queueSize; i++) {
                try {
                    this.queue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            this.full.countDown();
            this.queue.take();
            this.complete = true;
            this.finished.countDown();
        }

        public boolean isComplete() {
            return this.complete;
        }
    }

    /* loaded from: input_file:org/apache/streams/local/queues/ThroughputQueueMulitThreadTest$BlocksOnFullQueue.class */
    private class BlocksOnFullQueue implements Runnable {
        private CountDownLatch full;
        private volatile boolean complete = false;
        private int queueSize;
        private CountDownLatch finished;
        private BlockingQueue queue;

        public BlocksOnFullQueue(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, BlockingQueue blockingQueue, int i) {
            this.full = countDownLatch;
            this.queueSize = i;
            this.finished = countDownLatch2;
            this.queue = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 0; i < this.queueSize; i++) {
                try {
                    this.queue.put(Integer.valueOf(i));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            this.full.countDown();
            this.queue.put(0);
            this.complete = true;
            this.finished.countDown();
        }

        public boolean isComplete() {
            return this.complete;
        }
    }

    /* loaded from: input_file:org/apache/streams/local/queues/ThroughputQueueMulitThreadTest$PutData.class */
    private class PutData implements Runnable {
        private BlockingQueue queue;
        private int dataCount;
        private CountDownLatch finished;

        public PutData(CountDownLatch countDownLatch, BlockingQueue blockingQueue, int i) {
            this.queue = blockingQueue;
            this.dataCount = i;
            this.finished = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 0; i < this.dataCount; i++) {
                try {
                    this.queue.put(Integer.valueOf(i));
                } catch (InterruptedException e) {
                    ThroughputQueueMulitThreadTest.LOGGER.error("PUT DATA interupted !");
                    Thread.currentThread().interrupt();
                }
            }
            this.finished.countDown();
        }
    }

    /* loaded from: input_file:org/apache/streams/local/queues/ThroughputQueueMulitThreadTest$TakeData.class */
    private class TakeData implements Runnable {
        private BlockingQueue queue;

        public TakeData(BlockingQueue blockingQueue) {
            this.queue = blockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    this.queue.take();
                } catch (InterruptedException e) {
                    ThroughputQueueMulitThreadTest.LOGGER.error("PUT DATA interupted !");
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    @After
    public void unregisterMXBean() throws Exception {
        try {
            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format("org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s", MBEAN_ID, STREAM_ID, Long.valueOf(STREAM_START_TIME))));
        } catch (InstanceNotFoundException e) {
        }
    }

    @After
    public void removeLocalMBeans() {
        try {
            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
        } catch (Exception e) {
        }
    }

    @Test
    public void testBlockOnFullQueue() throws InterruptedException {
        int randomIntBetween = randomIntBetween(1, 3000);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ThroughputQueue throughputQueue = new ThroughputQueue(randomIntBetween);
        BlocksOnFullQueue blocksOnFullQueue = new BlocksOnFullQueue(countDownLatch, countDownLatch2, throughputQueue, randomIntBetween);
        newSingleThreadExecutor.submit(blocksOnFullQueue);
        countDownLatch.await();
        assertEquals(randomIntBetween, throughputQueue.size());
        assertEquals(randomIntBetween, throughputQueue.getCurrentSize());
        assertFalse(blocksOnFullQueue.isComplete());
        safeSleep(1000L);
        assertFalse(blocksOnFullQueue.isComplete());
        throughputQueue.take();
        countDownLatch2.await();
        assertEquals(randomIntBetween, throughputQueue.size());
        assertEquals(randomIntBetween, throughputQueue.getCurrentSize());
        assertTrue(blocksOnFullQueue.isComplete());
        newSingleThreadExecutor.shutdownNow();
        newSingleThreadExecutor.awaitTermination(500L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testBlockOnEmptyQueue() throws InterruptedException {
        int randomIntBetween = randomIntBetween(1, 3000);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ThroughputQueue throughputQueue = new ThroughputQueue();
        BlocksOnEmptyQueue blocksOnEmptyQueue = new BlocksOnEmptyQueue(countDownLatch, countDownLatch2, randomIntBetween, throughputQueue);
        for (int i = 0; i < randomIntBetween; i++) {
            throughputQueue.put(Integer.valueOf(i));
        }
        newSingleThreadExecutor.submit(blocksOnEmptyQueue);
        countDownLatch.await();
        assertEquals(0L, throughputQueue.size());
        assertEquals(0L, throughputQueue.getCurrentSize());
        assertFalse(blocksOnEmptyQueue.isComplete());
        safeSleep(1000L);
        assertFalse(blocksOnEmptyQueue.isComplete());
        throughputQueue.put(1);
        countDownLatch2.await();
        assertEquals(0L, throughputQueue.size());
        assertEquals(0L, throughputQueue.getCurrentSize());
        assertTrue(blocksOnEmptyQueue.isComplete());
        newSingleThreadExecutor.shutdownNow();
        newSingleThreadExecutor.awaitTermination(500L, TimeUnit.MILLISECONDS);
    }

    @Test
    @Repeat(iterations = 3)
    public void testMultiThreadAccessAndInteruptResponse() throws Exception {
        int randomIntBetween = randomIntBetween(1, 10);
        int randomIntBetween2 = randomIntBetween(1, 2000000);
        int randomIntBetween3 = randomIntBetween(1, 2000000);
        int randomIntBetween4 = randomIntBetween(1, 1000);
        CountDownLatch countDownLatch = new CountDownLatch(randomIntBetween);
        ThroughputQueue throughputQueue = new ThroughputQueue(randomIntBetween4, MBEAN_ID);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(randomIntBetween * 2);
        for (int i = 0; i < randomIntBetween; i++) {
            newFixedThreadPool.submit(new PutData(countDownLatch, throughputQueue, randomIntBetween2));
            newFixedThreadPool.submit(new TakeData(throughputQueue));
        }
        for (int i2 = 0; i2 < randomIntBetween3; i2++) {
            throughputQueue.getAvgWait();
            throughputQueue.getAdded();
            throughputQueue.getCurrentSize();
            throughputQueue.getMaxWait();
            throughputQueue.getRemoved();
            throughputQueue.getThroughput();
        }
        countDownLatch.await();
        while (!throughputQueue.isEmpty()) {
            LOGGER.info("Waiting for queue to be emptied...");
            safeSleep(500L);
        }
        long j = randomIntBetween2 * randomIntBetween;
        assertEquals(j, throughputQueue.getAdded());
        assertEquals(j, throughputQueue.getRemoved());
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        newFixedThreadPool.shutdownNow();
        newFixedThreadPool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
    }

    private void safeSleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
