/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.runtime;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.ExecutorBoltScheduler;
import org.neo4j.bolt.runtime.ExecutorFactory;
import org.neo4j.bolt.testing.Jobs;
import org.neo4j.function.Predicates;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.internal.LogService;
import org.neo4j.logging.internal.SimpleLogService;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;

public class ExecutorBoltSchedulerConcurrencyTest {
    private static final String CONNECTOR_KEY = "connector-id";
    private static final int maxPoolSize = 5;
    private final CountDownLatch beforeExecuteEvent = new CountDownLatch(1);
    private final CountDownLatch beforeExecuteBarrier = new CountDownLatch(5);
    private final CountDownLatch afterExecuteEvent = new CountDownLatch(1);
    private final CountDownLatch afterExecuteBarrier = new CountDownLatch(5);
    private final AssertableLogProvider logProvider = new AssertableLogProvider();
    private final LogService logService = new SimpleLogService((LogProvider)this.logProvider, (LogProvider)this.logProvider);
    private final ExecutorFactory executorFactory = new NotifyingThreadPoolFactory();
    private final JobScheduler jobScheduler = (JobScheduler)Mockito.mock(JobScheduler.class);
    private final ExecutorBoltScheduler boltScheduler = new ExecutorBoltScheduler("connector-id", this.executorFactory, this.jobScheduler, this.logService, 5, 5, Duration.ofMinutes(1L), 0, (ExecutorService)ForkJoinPool.commonPool());

    @Before
    public void setup() throws Throwable {
        Mockito.when((Object)this.jobScheduler.threadFactory((Group)Matchers.any())).thenReturn((Object)Executors.defaultThreadFactory());
        this.boltScheduler.start();
    }

    @After
    public void cleanup() throws Throwable {
        this.boltScheduler.stop();
    }

    @Test
    public void shouldInvokeHandleSchedulingErrorIfNoThreadsAvailable() throws Throwable {
        AtomicInteger handleSchedulingErrorCounter = new AtomicInteger(0);
        BoltConnection newConnection = this.newConnection(UUID.randomUUID().toString());
        ((BoltConnection)Mockito.doAnswer(this.newCountingAnswer(handleSchedulingErrorCounter)).when((Object)newConnection)).handleSchedulingError((Throwable)Matchers.any());
        this.blockAllThreads();
        this.boltScheduler.created(newConnection);
        CompletableFuture.runAsync(() -> this.boltScheduler.enqueued(newConnection, Jobs.noop()));
        Predicates.awaitForever(() -> handleSchedulingErrorCounter.get() > 0, (long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        Assert.assertEquals((long)1L, (long)handleSchedulingErrorCounter.get());
        this.afterExecuteEvent.countDown();
        this.afterExecuteBarrier.await();
    }

    @Test
    public void shouldNotScheduleNewJobIfHandlingSchedulingError() throws Throwable {
        AtomicInteger handleSchedulingErrorCounter = new AtomicInteger(0);
        AtomicBoolean exitCondition = new AtomicBoolean();
        BoltConnection newConnection = this.newConnection(UUID.randomUUID().toString());
        ((BoltConnection)Mockito.doAnswer(this.newBlockingAnswer(handleSchedulingErrorCounter, exitCondition)).when((Object)newConnection)).handleSchedulingError((Throwable)Matchers.any());
        this.blockAllThreads();
        this.boltScheduler.created(newConnection);
        CompletableFuture.runAsync(() -> this.boltScheduler.enqueued(newConnection, Jobs.noop()));
        Predicates.awaitForever(() -> handleSchedulingErrorCounter.get() > 0, (long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        this.afterExecuteEvent.countDown();
        this.afterExecuteBarrier.await();
        this.boltScheduler.enqueued(newConnection, Jobs.noop());
        exitCondition.set(true);
        Assert.assertEquals((long)1L, (long)handleSchedulingErrorCounter.get());
        ((BoltConnection)Mockito.verify((Object)newConnection, (VerificationMode)Mockito.never())).processNextBatch();
    }

    private void blockAllThreads() throws InterruptedException {
        for (int i = 0; i < 5; ++i) {
            BoltConnection connection = this.newConnection(UUID.randomUUID().toString());
            this.boltScheduler.created(connection);
            this.boltScheduler.enqueued(connection, Jobs.noop());
        }
        this.beforeExecuteEvent.countDown();
        this.beforeExecuteBarrier.await();
    }

    private <T> Answer<T> newCountingAnswer(AtomicInteger counter) {
        return invocationOnMock -> {
            counter.incrementAndGet();
            return null;
        };
    }

    private <T> Answer<T> newBlockingAnswer(AtomicInteger counter, AtomicBoolean exitCondition) {
        return invocationOnMock -> {
            counter.incrementAndGet();
            Predicates.awaitForever(() -> Thread.currentThread().isInterrupted() || exitCondition.get(), (long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
            return null;
        };
    }

    private BoltConnection newConnection(String id) {
        BoltConnection result = (BoltConnection)Mockito.mock(BoltConnection.class);
        Mockito.when((Object)result.id()).thenReturn((Object)id);
        Mockito.when((Object)result.remoteAddress()).thenReturn((Object)new InetSocketAddress("localhost", 32000));
        return result;
    }

    private class NotifyingThreadPoolExecutor
    extends ThreadPoolExecutor {
        private NotifyingThreadPoolExecutor(int corePoolSize, int maxPoolSize, Duration keepAlive, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectionHandler) {
            super(corePoolSize, maxPoolSize, keepAlive.toMillis(), TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectionHandler);
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            try {
                ExecutorBoltSchedulerConcurrencyTest.this.beforeExecuteEvent.await();
                super.beforeExecute(t, r);
                ExecutorBoltSchedulerConcurrencyTest.this.beforeExecuteBarrier.countDown();
            }
            catch (Throwable ex) {
                throw new RuntimeException(ex);
            }
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                ExecutorBoltSchedulerConcurrencyTest.this.afterExecuteEvent.await();
                super.afterExecute(r, t);
                ExecutorBoltSchedulerConcurrencyTest.this.afterExecuteBarrier.countDown();
            }
            catch (Throwable ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    private class NotifyingThreadPoolFactory
    implements ExecutorFactory {
        private NotifyingThreadPoolFactory() {
        }

        public ExecutorService create(int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, boolean startCoreThreads, ThreadFactory threadFactory) {
            return new NotifyingThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, new SynchronousQueue(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        }
    }
}

