/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.Attribute;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.neo4j.bolt.transport.DefaultThrottleLock;
import org.neo4j.bolt.transport.ThrottleLock;
import org.neo4j.bolt.transport.TransportThrottle;
import org.neo4j.bolt.transport.TransportThrottleException;
import org.neo4j.bolt.transport.TransportWriteThrottle;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.time.Clocks;
import org.neo4j.time.FakeClock;

public class TransportWriteThrottleTest {
    @Rule
    public OtherThreadRule<Void> otherThread = new OtherThreadRule(1L, TimeUnit.MINUTES);
    private ChannelHandlerContext context;
    private Channel channel;
    private SocketChannelConfig config;
    private ThrottleLock lock;
    private Attribute lockAttribute;

    @Before
    public void setup() throws Exception {
        this.lock = TransportWriteThrottleTest.newThrottleLockMock();
        this.config = (SocketChannelConfig)Mockito.mock(SocketChannelConfig.class);
        this.lockAttribute = (Attribute)Mockito.mock(Attribute.class);
        Mockito.when((Object)this.lockAttribute.get()).thenReturn((Object)this.lock);
        Attribute durationExceedAttribute = (Attribute)Mockito.mock(Attribute.class);
        Mockito.when((Object)durationExceedAttribute.get()).thenReturn(null);
        this.channel = (Channel)Mockito.mock(SocketChannel.class, (Answer)Answers.RETURNS_MOCKS);
        Mockito.when((Object)this.channel.config()).thenReturn((Object)this.config);
        Mockito.when((Object)this.channel.isOpen()).thenReturn((Object)true);
        Mockito.when((Object)this.channel.remoteAddress()).thenReturn((Object)InetSocketAddress.createUnresolved("localhost", 32000));
        Mockito.when((Object)this.channel.attr(TransportWriteThrottle.LOCK_KEY)).thenReturn((Object)this.lockAttribute);
        Mockito.when((Object)this.channel.attr(TransportWriteThrottle.MAX_DURATION_EXCEEDED_KEY)).thenReturn((Object)durationExceedAttribute);
        ChannelPipeline pipeline = this.channel.pipeline();
        Mockito.when((Object)this.channel.pipeline()).thenReturn((Object)pipeline);
        this.context = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class, (Answer)Answers.RETURNS_MOCKS);
        Mockito.when((Object)this.context.channel()).thenReturn((Object)this.channel);
    }

    @Test
    public void shouldSetWriteBufferWatermarkOnChannelConfigWhenInstalled() {
        TransportThrottle throttle = this.newThrottle();
        throttle.install(this.channel);
        ArgumentCaptor argument = ArgumentCaptor.forClass(WriteBufferWaterMark.class);
        ((SocketChannelConfig)Mockito.verify((Object)this.config, (VerificationMode)Mockito.times((int)1))).setWriteBufferWaterMark((WriteBufferWaterMark)argument.capture());
        Assert.assertEquals((long)64L, (long)((WriteBufferWaterMark)argument.getValue()).low());
        Assert.assertEquals((long)256L, (long)((WriteBufferWaterMark)argument.getValue()).high());
    }

    @Test
    public void shouldNotLockWhenWritable() throws Exception {
        TestThrottleLock lockOverride = new TestThrottleLock();
        TransportThrottle throttle = this.newThrottleAndInstall(this.channel, lockOverride);
        Mockito.when((Object)this.channel.isWritable()).thenReturn((Object)true);
        Future future = this.otherThread.execute(state -> {
            throttle.acquire(this.channel);
            return null;
        });
        try {
            future.get(2000L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            Assert.fail((String)"should not throw");
        }
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertThat((Object)lockOverride.lockCallCount(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)lockOverride.unlockCallCount(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void shouldLockWhenNotWritable() throws Exception {
        TestThrottleLock lockOverride = new TestThrottleLock();
        TransportThrottle throttle = this.newThrottleAndInstall(this.channel, lockOverride);
        Mockito.when((Object)this.channel.isWritable()).thenReturn((Object)false);
        Future future = this.otherThread.execute(state -> {
            throttle.acquire(this.channel);
            return null;
        });
        try {
            future.get(2000L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"should timeout");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)future.isDone());
        Assert.assertThat((Object)lockOverride.lockCallCount(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
        Assert.assertThat((Object)lockOverride.unlockCallCount(), (Matcher)Matchers.is((Object)0));
        future.cancel(true);
        try {
            this.otherThread.get().awaitFuture(future);
            Assert.fail((String)"Exception expected");
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
    }

    @Test
    public void shouldResumeWhenWritableOnceAgain() throws Exception {
        TransportThrottle throttle = this.newThrottleAndInstall(this.channel);
        Mockito.when((Object)this.channel.isWritable()).thenReturn((Object)false).thenReturn((Object)true);
        throttle.acquire(this.channel);
        ((ThrottleLock)Mockito.verify((Object)this.lock, (VerificationMode)Mockito.atLeast((int)1))).lock((Channel)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((ThrottleLock)Mockito.verify((Object)this.lock, (VerificationMode)Mockito.never())).unlock((Channel)ArgumentMatchers.any());
    }

    @Test
    public void shouldResumeWhenWritabilityChanged() throws Exception {
        TestThrottleLock lockOverride = new TestThrottleLock();
        TransportThrottle throttle = this.newThrottleAndInstall(this.channel, lockOverride);
        Mockito.when((Object)this.channel.isWritable()).thenReturn((Object)false);
        Future completionFuture = this.otherThread.execute(state -> {
            throttle.acquire(this.channel);
            return null;
        });
        this.otherThread.get().waitUntilWaiting();
        Mockito.when((Object)this.channel.isWritable()).thenReturn((Object)true);
        ArgumentCaptor captor = ArgumentCaptor.forClass(ChannelInboundHandler.class);
        ((ChannelPipeline)Mockito.verify((Object)this.channel.pipeline())).addLast(new ChannelHandler[]{(ChannelHandler)captor.capture()});
        ((ChannelInboundHandler)captor.getValue()).channelWritabilityChanged(this.context);
        this.otherThread.get().awaitFuture(completionFuture);
        Assert.assertThat((Object)lockOverride.lockCallCount(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
        Assert.assertThat((Object)lockOverride.unlockCallCount(), (Matcher)Matchers.is((Object)1));
    }

    @Test
    public void shouldThrowThrottleExceptionWhenMaxDurationIsReached() throws Exception {
        TestThrottleLock lockOverride = new TestThrottleLock();
        FakeClock clock = Clocks.fakeClock((long)1L, (TimeUnit)TimeUnit.SECONDS);
        TransportThrottle throttle = this.newThrottleAndInstall(this.channel, lockOverride, (Clock)clock, Duration.ofSeconds(5L));
        Mockito.when((Object)this.channel.isWritable()).thenReturn((Object)false);
        Future future = this.otherThread.execute(state -> {
            throttle.acquire(this.channel);
            return null;
        });
        this.otherThread.get().waitUntilWaiting();
        clock.forward(6L, TimeUnit.SECONDS);
        try {
            future.get(1L, TimeUnit.MINUTES);
            Assert.fail((String)"expecting ExecutionException");
        }
        catch (ExecutionException ex) {
            Assert.assertThat((Object)ex.getCause(), (Matcher)Matchers.instanceOf(TransportThrottleException.class));
            Assert.assertThat((Object)ex.getMessage(), (Matcher)Matchers.containsString((String)"will be closed because the client did not consume outgoing buffers for"));
        }
    }

    private TransportThrottle newThrottle() {
        return this.newThrottle(null, Clocks.systemClock(), Duration.ZERO);
    }

    private TransportThrottle newThrottle(ThrottleLock lockOverride, Clock clock, Duration maxLockDuration) {
        if (lockOverride != null) {
            this.lock = lockOverride;
            Mockito.when((Object)this.lockAttribute.get()).thenReturn((Object)lockOverride);
        }
        return new TransportWriteThrottle(64, 256, clock, maxLockDuration, () -> this.lock);
    }

    private TransportThrottle newThrottleAndInstall(Channel channel) {
        return this.newThrottleAndInstall(channel, null);
    }

    private TransportThrottle newThrottleAndInstall(Channel channel, ThrottleLock lockOverride) {
        return this.newThrottleAndInstall(channel, lockOverride, Clocks.systemClock(), Duration.ZERO);
    }

    private TransportThrottle newThrottleAndInstall(Channel channel, ThrottleLock lockOverride, Clock clock, Duration maxLockDuration) {
        TransportThrottle throttle = this.newThrottle(lockOverride, clock, maxLockDuration);
        throttle.install(channel);
        return throttle;
    }

    private static ThrottleLock newThrottleLockMock() throws InterruptedException {
        ThrottleLock lock = (ThrottleLock)Mockito.mock(ThrottleLock.class);
        ((ThrottleLock)Mockito.doAnswer(invocation -> {
            Thread.sleep(500L);
            return null;
        }).when((Object)lock)).lock((Channel)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        return lock;
    }

    private static class TestThrottleLock
    implements ThrottleLock {
        private AtomicInteger lockCount = new AtomicInteger(0);
        private AtomicInteger unlockCount = new AtomicInteger(0);
        private ThrottleLock actualLock = new DefaultThrottleLock();

        private TestThrottleLock() {
        }

        public void lock(Channel channel, long timeout) throws InterruptedException {
            this.actualLock.lock(channel, timeout);
            this.lockCount.incrementAndGet();
        }

        public void unlock(Channel channel) {
            this.actualLock.unlock(channel);
            this.unlockCount.incrementAndGet();
        }

        public int lockCallCount() {
            return this.lockCount.get();
        }

        public int unlockCallCount() {
            return this.unlockCount.get();
        }
    }
}

