package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;

/* loaded from: input_file:org/neo4j/causalclustering/messaging/ReconnectingChannelIT.class */
public class ReconnectingChannelIT {
    private static final int PORT = PortAuthority.allocatePort();
    private static final long DEFAULT_TIMEOUT_MS = 20000;
    private EventLoopGroup elg;
    private ReconnectingChannel channel;
    private final Log log = NullLogProvider.getInstance().getLog(getClass());
    private final ListenSocketAddress listenAddress = new ListenSocketAddress("localhost", PORT);
    private final Server server = new Server(socketChannel -> {
    }, this.listenAddress, "test-server");
    private AtomicInteger childCount = new AtomicInteger();
    private final ChannelHandler childCounter = new ChannelInitializer<SocketChannel>() { // from class: org.neo4j.causalclustering.messaging.ReconnectingChannelIT.1
        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) {
            socketChannel.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: org.neo4j.causalclustering.messaging.ReconnectingChannelIT.1.1
                public void channelActive(ChannelHandlerContext channelHandlerContext) {
                    ReconnectingChannelIT.this.childCount.incrementAndGet();
                }

                public void channelInactive(ChannelHandlerContext channelHandlerContext) {
                    ReconnectingChannelIT.this.childCount.decrementAndGet();
                }
            }});
        }
    };

    @Before
    public void before() {
        this.elg = new NioEventLoopGroup(0);
        this.channel = new ReconnectingChannel(new Bootstrap().channel(NioSocketChannel.class).group(this.elg).handler(this.childCounter), this.elg.next(), this.listenAddress, this.log);
    }

    @After
    public void after() throws Throwable {
        this.elg.shutdownGracefully(0L, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS).awaitUninterruptibly();
        this.server.stop();
    }

    @Test
    public void shouldBeAbleToSendMessage() throws Throwable {
        this.server.start();
        this.channel.start();
        this.channel.writeAndFlush(emptyBuffer()).get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    @Test
    public void shouldAllowDeferredSend() throws Throwable {
        this.channel.start();
        this.server.start();
        this.channel.writeAndFlush(emptyBuffer()).get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    @Test(expected = ExecutionException.class)
    public void shouldFailSendWhenNoServer() throws Exception {
        this.channel.start();
        this.channel.writeAndFlush(emptyBuffer()).get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    @Test
    public void shouldReconnectAfterServerComesBack() throws Throwable {
        this.server.start();
        this.channel.start();
        this.channel.writeAndFlush(emptyBuffer()).get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        this.server.stop();
        try {
            this.channel.writeAndFlush(emptyBuffer()).get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            Assert.fail("Expected failure to send");
        } catch (ExecutionException e) {
        }
        this.server.start();
        this.channel.writeAndFlush(emptyBuffer()).get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    @Test
    public void shouldNotAllowSendingOnDisposedChannel() throws Throwable {
        this.server.start();
        this.channel.start();
        this.channel.writeAndFlush(emptyBuffer()).get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        AtomicInteger atomicInteger = this.childCount;
        atomicInteger.getClass();
        org.neo4j.test.assertion.Assert.assertEventually(atomicInteger::get, CoreMatchers.equalTo(1), DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        this.channel.dispose();
        try {
            this.channel.writeAndFlush(emptyBuffer());
        } catch (IllegalStateException e) {
        }
        AtomicInteger atomicInteger2 = this.childCount;
        atomicInteger2.getClass();
        org.neo4j.test.assertion.Assert.assertEventually(atomicInteger2::get, CoreMatchers.equalTo(0), DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    private ByteBuf emptyBuffer() {
        return ByteBufAllocator.DEFAULT.buffer();
    }
}
