/*
 * Decompiled with CFR 0.152.
 */
package io.airlift.concurrent;

import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.AsyncSemaphore;
import io.airlift.concurrent.Threads;
import io.airlift.testing.Assertions;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestAsyncSemaphore {
    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"async-semaphore-%s")));

    @Test
    public void testInlineExecution() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(1, (Executor)this.executor, new Function<Runnable, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Runnable task) {
                return MoreExecutors.sameThreadExecutor().submit(task);
            }
        });
        final AtomicInteger count = new AtomicInteger();
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit((Object)new Runnable(){

                @Override
                public void run() {
                    count.incrementAndGet();
                }
            }));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
        Assert.assertEquals((int)count.get(), (int)1000);
    }

    @Test
    public void testSingleThreadBoundedConcurrency() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(1, (Executor)this.executor, new Function<Runnable, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Runnable task) {
                return TestAsyncSemaphore.this.executor.submit(task);
            }
        });
        final AtomicInteger count = new AtomicInteger();
        final AtomicInteger concurrency = new AtomicInteger();
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit((Object)new Runnable(){

                @Override
                public void run() {
                    count.incrementAndGet();
                    int currentConcurrency = concurrency.incrementAndGet();
                    Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(1));
                    Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                    concurrency.decrementAndGet();
                }
            }));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
        Assert.assertEquals((int)count.get(), (int)1000);
    }

    @Test
    public void testMultiThreadBoundedConcurrency() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, new Function<Runnable, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Runnable task) {
                return TestAsyncSemaphore.this.executor.submit(task);
            }
        });
        final AtomicInteger count = new AtomicInteger();
        final AtomicInteger concurrency = new AtomicInteger();
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit((Object)new Runnable(){

                @Override
                public void run() {
                    count.incrementAndGet();
                    int currentConcurrency = concurrency.incrementAndGet();
                    Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
                    Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                    concurrency.decrementAndGet();
                }
            }));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
        Assert.assertEquals((int)count.get(), (int)1000);
    }

    @Test
    public void testMultiSubmitters() throws Exception {
        final AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, new Function<Runnable, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Runnable task) {
                return TestAsyncSemaphore.this.executor.submit(task);
            }
        });
        final AtomicInteger count = new AtomicInteger();
        final AtomicInteger concurrency = new AtomicInteger();
        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch completionLatch = new CountDownLatch(100);
        for (int i = 0; i < 100; ++i) {
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    Uninterruptibles.awaitUninterruptibly((CountDownLatch)startLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
                    asyncSemaphore.submit((Object)new Runnable(){

                        @Override
                        public void run() {
                            count.incrementAndGet();
                            int currentConcurrency = concurrency.incrementAndGet();
                            Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
                            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                            concurrency.decrementAndGet();
                            completionLatch.countDown();
                        }
                    });
                }
            });
        }
        startLatch.countDown();
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)completionLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        Assert.assertEquals((int)count.get(), (int)100);
    }

    @Test
    public void testFailedTasks() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, new Function<Runnable, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Runnable task) {
                return TestAsyncSemaphore.this.executor.submit(task);
            }
        });
        final AtomicInteger successCount = new AtomicInteger();
        final AtomicInteger failureCount = new AtomicInteger();
        final AtomicInteger concurrency = new AtomicInteger();
        final CountDownLatch completionLatch = new CountDownLatch(1000);
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            ListenableFuture future = asyncSemaphore.submit((Object)new Runnable(){

                @Override
                public void run() {
                    int currentConcurrency = concurrency.incrementAndGet();
                    Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
                    Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                    concurrency.decrementAndGet();
                    throw new IllegalStateException();
                }
            });
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Object>(){

                public void onSuccess(@Nullable Object result) {
                    successCount.incrementAndGet();
                    completionLatch.countDown();
                }

                public void onFailure(Throwable t) {
                    failureCount.incrementAndGet();
                    completionLatch.countDown();
                }
            });
            futures.add(future);
        }
        completionLatch.await(1L, TimeUnit.MINUTES);
        for (ListenableFuture future : futures) {
            try {
                future.get();
                Assert.fail();
            }
            catch (Exception exception) {}
        }
        Assert.assertEquals((int)successCount.get(), (int)0);
        Assert.assertEquals((int)failureCount.get(), (int)1000);
    }

    @Test
    public void testFailedTaskSubmission() throws Exception {
        final AtomicInteger successCount = new AtomicInteger();
        final AtomicInteger failureCount = new AtomicInteger();
        final AtomicInteger concurrency = new AtomicInteger();
        final CountDownLatch completionLatch = new CountDownLatch(1000);
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, new Function<Runnable, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Runnable task) {
                int currentConcurrency = concurrency.incrementAndGet();
                Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
                Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                concurrency.decrementAndGet();
                throw new IllegalStateException();
            }
        });
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            ListenableFuture future = asyncSemaphore.submit((Object)new Runnable(){

                @Override
                public void run() {
                    Assert.fail();
                }
            });
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Object>(){

                public void onSuccess(@Nullable Object result) {
                    successCount.incrementAndGet();
                    completionLatch.countDown();
                }

                public void onFailure(Throwable t) {
                    failureCount.incrementAndGet();
                    completionLatch.countDown();
                }
            });
            futures.add(future);
        }
        completionLatch.await(1L, TimeUnit.MINUTES);
        for (ListenableFuture future : futures) {
            try {
                future.get();
                Assert.fail();
            }
            catch (Exception exception) {}
        }
        Assert.assertEquals((int)successCount.get(), (int)0);
        Assert.assertEquals((int)failureCount.get(), (int)1000);
    }

    @Test
    public void testFailedTaskWithMultipleSubmitters() throws Exception {
        final AtomicInteger successCount = new AtomicInteger();
        final AtomicInteger failureCount = new AtomicInteger();
        final AtomicInteger concurrency = new AtomicInteger();
        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch completionLatch = new CountDownLatch(100);
        final AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, new Function<Runnable, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Runnable task) {
                int currentConcurrency = concurrency.incrementAndGet();
                Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
                Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                concurrency.decrementAndGet();
                throw new IllegalStateException();
            }
        });
        final ConcurrentLinkedQueue futures = new ConcurrentLinkedQueue();
        for (int i = 0; i < 100; ++i) {
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    Uninterruptibles.awaitUninterruptibly((CountDownLatch)startLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
                    ListenableFuture future = asyncSemaphore.submit((Object)new Runnable(){

                        @Override
                        public void run() {
                            Assert.fail();
                        }
                    });
                    futures.add(future);
                    Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Object>(){

                        public void onSuccess(@Nullable Object result) {
                            successCount.incrementAndGet();
                            completionLatch.countDown();
                        }

                        public void onFailure(Throwable t) {
                            failureCount.incrementAndGet();
                            completionLatch.countDown();
                        }
                    });
                }
            });
        }
        startLatch.countDown();
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)completionLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        for (ListenableFuture future : futures) {
            try {
                future.get();
                Assert.fail();
            }
            catch (Exception exception) {}
        }
        Assert.assertEquals((int)successCount.get(), (int)0);
        Assert.assertEquals((int)failureCount.get(), (int)100);
    }

    @Test
    public void testNoStackOverflow() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(1, (Executor)this.executor, new Function<Object, ListenableFuture<?>>(){

            public ListenableFuture<?> apply(Object object) {
                return Futures.immediateFuture(null);
            }
        });
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit(new Object()));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
    }
}

