/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.runtime;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.bolt.BoltServer;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.CachedThreadPoolExecutorFactory;
import org.neo4j.bolt.runtime.ExecutorBoltScheduler;
import org.neo4j.bolt.runtime.ExecutorFactory;
import org.neo4j.bolt.testing.Jobs;
import org.neo4j.function.Predicates;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLog;
import org.neo4j.logging.internal.LogService;
import org.neo4j.logging.internal.SimpleLogService;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.test.matchers.CommonMatchers;

public class ExecutorBoltSchedulerTest {
    private static final String CONNECTOR_KEY = "connector-id";
    private final AssertableLogProvider logProvider = new AssertableLogProvider();
    private final LogService logService = new SimpleLogService((LogProvider)this.logProvider);
    private final Config config = Config.defaults();
    private final ExecutorFactory executorFactory = new CachedThreadPoolExecutorFactory((Log)NullLog.getInstance());
    private final JobScheduler jobScheduler = (JobScheduler)Mockito.mock(JobScheduler.class);
    private final ExecutorBoltScheduler boltScheduler = new ExecutorBoltScheduler("connector-id", this.executorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, (ExecutorService)ForkJoinPool.commonPool());

    @Before
    public void setup() {
        Mockito.when((Object)this.jobScheduler.threadFactory((Group)ArgumentMatchers.any())).thenReturn((Object)Executors.defaultThreadFactory());
    }

    @After
    public void cleanup() throws Throwable {
        this.boltScheduler.stop();
    }

    @Test
    public void initShouldCreateThreadPool() throws Throwable {
        ExecutorFactory mockExecutorFactory = (ExecutorFactory)Mockito.mock(ExecutorFactory.class);
        Mockito.when((Object)mockExecutorFactory.create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean(), (ThreadFactory)ArgumentMatchers.any())).thenReturn((Object)Executors.newCachedThreadPool());
        ExecutorBoltScheduler scheduler = new ExecutorBoltScheduler(CONNECTOR_KEY, mockExecutorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, (ExecutorService)ForkJoinPool.commonPool());
        scheduler.start();
        ((JobScheduler)Mockito.verify((Object)this.jobScheduler)).threadFactory(Group.BOLT_WORKER);
        ((ExecutorFactory)Mockito.verify((Object)mockExecutorFactory, (VerificationMode)Mockito.times((int)1))).create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration)ArgumentMatchers.any(Duration.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean(), (ThreadFactory)ArgumentMatchers.any(ThreadFactory.class));
    }

    @Test
    public void shutdownShouldTerminateThreadPool() throws Throwable {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        ExecutorFactory mockExecutorFactory = (ExecutorFactory)Mockito.mock(ExecutorFactory.class);
        Mockito.when((Object)mockExecutorFactory.create(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (Duration)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean(), (ThreadFactory)ArgumentMatchers.any())).thenReturn((Object)cachedThreadPool);
        ExecutorBoltScheduler scheduler = new ExecutorBoltScheduler(CONNECTOR_KEY, mockExecutorFactory, this.jobScheduler, this.logService, 0, 10, Duration.ofMinutes(1L), 0, (ExecutorService)ForkJoinPool.commonPool());
        scheduler.start();
        scheduler.stop();
        Assert.assertTrue((boolean)cachedThreadPool.isShutdown());
    }

    @Test
    public void createdShouldAddConnectionToActiveConnections() throws Throwable {
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        ((BoltConnection)Mockito.verify((Object)connection)).id();
        Assert.assertTrue((boolean)this.boltScheduler.isRegistered(connection));
    }

    @Test
    public void destroyedShouldRemoveConnectionFromActiveConnections() throws Throwable {
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.closed(connection);
        Assert.assertFalse((boolean)this.boltScheduler.isRegistered(connection));
    }

    @Test
    public void enqueuedShouldScheduleJob() throws Throwable {
        String id = UUID.randomUUID().toString();
        AtomicBoolean exitCondition = new AtomicBoolean();
        BoltConnection connection = this.newConnection(id);
        Mockito.when((Object)connection.processNextBatch()).thenAnswer(inv -> ExecutorBoltSchedulerTest.awaitExit(exitCondition));
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        Predicates.await(() -> this.boltScheduler.isActive(connection), (long)1L, (TimeUnit)TimeUnit.MINUTES);
        exitCondition.set(true);
        Predicates.await(() -> !this.boltScheduler.isActive(connection), (long)1L, (TimeUnit)TimeUnit.MINUTES);
        ((BoltConnection)Mockito.verify((Object)connection)).processNextBatch();
    }

    @Test
    public void enqueuedShouldNotScheduleJobWhenActiveWorkItemExists() throws Throwable {
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        AtomicBoolean exitCondition = new AtomicBoolean();
        Mockito.when((Object)connection.processNextBatch()).thenAnswer(inv -> ExecutorBoltSchedulerTest.awaitExit(exitCondition));
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        Predicates.await(() -> this.boltScheduler.isActive(connection), (long)1L, (TimeUnit)TimeUnit.MINUTES);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        exitCondition.set(true);
        Predicates.await(() -> !this.boltScheduler.isActive(connection), (long)1L, (TimeUnit)TimeUnit.MINUTES);
        ((BoltConnection)Mockito.verify((Object)connection)).processNextBatch();
    }

    @Test
    public void failingJobShouldLogAndStopConnection() throws Throwable {
        AtomicBoolean stopped = new AtomicBoolean();
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        ((BoltConnection)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("some unexpected error")}).when((Object)connection)).processNextBatch();
        ((BoltConnection)Mockito.doAnswer(inv -> stopped.getAndSet(true)).when((Object)connection)).stop();
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        Predicates.await(() -> stopped.get(), (long)1L, (TimeUnit)TimeUnit.MINUTES);
        Assert.assertFalse((boolean)this.boltScheduler.isActive(connection));
        ((BoltConnection)Mockito.verify((Object)connection)).processNextBatch();
        ((BoltConnection)Mockito.verify((Object)connection)).stop();
        this.logProvider.assertExactly(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog((Matcher)Matchers.containsString((String)BoltServer.class.getPackage().getName())).error(Matchers.containsString((String)"Unexpected error during job scheduling for session"), CommonMatchers.matchesExceptionMessage((Matcher)Matchers.containsString((String)"some unexpected error")))});
    }

    @Test
    public void successfulJobsShouldTriggerSchedulingOfPendingJobs() throws Throwable {
        AtomicInteger counter = new AtomicInteger();
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        Mockito.when((Object)connection.processNextBatch()).thenAnswer(inv -> counter.incrementAndGet() > 0);
        Mockito.when((Object)connection.hasPendingJobs()).thenReturn((Object)true).thenReturn((Object)false);
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        Predicates.await(() -> counter.get() > 1, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        ((BoltConnection)Mockito.verify((Object)connection, (VerificationMode)Mockito.times((int)2))).processNextBatch();
    }

    @Test
    public void destroyedShouldCancelActiveWorkItem() throws Throwable {
        AtomicInteger processNextBatchCount = new AtomicInteger();
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        AtomicBoolean exitCondition = new AtomicBoolean();
        Mockito.when((Object)connection.processNextBatch()).thenAnswer(inv -> {
            processNextBatchCount.incrementAndGet();
            return ExecutorBoltSchedulerTest.awaitExit(exitCondition);
        });
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        Predicates.await(() -> processNextBatchCount.get() > 0, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        this.boltScheduler.closed(connection);
        Predicates.await(() -> !this.boltScheduler.isActive(connection), (long)1L, (TimeUnit)TimeUnit.MINUTES);
        Assert.assertFalse((boolean)this.boltScheduler.isActive(connection));
        Assert.assertEquals((long)1L, (long)processNextBatchCount.get());
        exitCondition.set(true);
    }

    @Test
    public void createdWorkerThreadsShouldContainConnectorName() throws Exception {
        AtomicInteger executeBatchCompletionCount = new AtomicInteger();
        AtomicReference poolThread = new AtomicReference();
        AtomicReference poolThreadName = new AtomicReference();
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        Mockito.when((Object)connection.hasPendingJobs()).thenAnswer(inv -> {
            executeBatchCompletionCount.incrementAndGet();
            return false;
        });
        Mockito.when((Object)connection.processNextBatch()).thenAnswer(inv -> {
            poolThread.set(Thread.currentThread());
            poolThreadName.set(Thread.currentThread().getName());
            return true;
        });
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        Predicates.await(() -> executeBatchCompletionCount.get() > 0, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        Assert.assertThat((Object)((Thread)poolThread.get()).getName(), (Matcher)Matchers.not((Matcher)Matchers.equalTo(poolThreadName.get())));
        Assert.assertThat((Object)((Thread)poolThread.get()).getName(), (Matcher)Matchers.containsString((String)String.format("[%s]", CONNECTOR_KEY)));
        Assert.assertThat((Object)((Thread)poolThread.get()).getName(), (Matcher)Matchers.not((Matcher)Matchers.containsString((String)String.format("[%s]", connection.remoteAddress()))));
    }

    @Test
    public void createdWorkerThreadsShouldContainConnectorNameAndRemoteAddressInTheirNamesWhenActive() throws Exception {
        AtomicReference capturedThreadName = new AtomicReference();
        AtomicInteger processNextBatchCount = new AtomicInteger();
        String id = UUID.randomUUID().toString();
        BoltConnection connection = this.newConnection(id);
        AtomicBoolean exitCondition = new AtomicBoolean();
        Mockito.when((Object)connection.processNextBatch()).thenAnswer(inv -> {
            capturedThreadName.set(Thread.currentThread().getName());
            processNextBatchCount.incrementAndGet();
            return ExecutorBoltSchedulerTest.awaitExit(exitCondition);
        });
        this.boltScheduler.start();
        this.boltScheduler.created(connection);
        this.boltScheduler.enqueued(connection, Jobs.noop());
        Predicates.await(() -> processNextBatchCount.get() > 0, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        Assert.assertThat(capturedThreadName.get(), (Matcher)Matchers.containsString((String)String.format("[%s]", CONNECTOR_KEY)));
        Assert.assertThat(capturedThreadName.get(), (Matcher)Matchers.containsString((String)String.format("[%s]", connection.remoteAddress())));
        exitCondition.set(true);
    }

    private BoltConnection newConnection(String id) {
        BoltConnection result = (BoltConnection)Mockito.mock(BoltConnection.class);
        Mockito.when((Object)result.id()).thenReturn((Object)id);
        Mockito.when((Object)result.remoteAddress()).thenReturn((Object)new InetSocketAddress("localhost", 32000));
        return result;
    }

    private static boolean awaitExit(AtomicBoolean exitCondition) {
        Predicates.awaitForever(() -> Thread.currentThread().isInterrupted() || exitCondition.get(), (long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        return true;
    }
}

