package org.neo4j.causalclustering.protocol.handshake;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstallerV1;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstallerV1;
import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstallerV2;
import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstallerV2;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.stream.Streams;
import org.neo4j.test.assertion.Assert;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT.class */
public class NettyInstalledProtocolsIT {
    private Parameters parameters;
    private static final int TIMEOUT_SECONDS = 10;
    private static final LogProvider logProvider = FormattedLogProvider.toOutputStream(System.out);
    private Server server;
    private Client client;

    /* loaded from: input_file:org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT$Client.class */
    static class Client {
        private Bootstrap bootstrap;
        private NioEventLoopGroup eventLoopGroup;
        private Channel channel;
        private HandshakeClientInitializer handshakeClientInitializer;

        Client(ApplicationProtocolRepository applicationProtocolRepository, ModifierProtocolRepository modifierProtocolRepository, NettyPipelineBuilderFactory nettyPipelineBuilderFactory, Config config) {
            ProtocolInstallerRepository protocolInstallerRepository = new ProtocolInstallerRepository(Arrays.asList(new RaftProtocolClientInstallerV1.Factory(nettyPipelineBuilderFactory, NettyInstalledProtocolsIT.logProvider), new RaftProtocolClientInstallerV2.Factory(nettyPipelineBuilderFactory, NettyInstalledProtocolsIT.logProvider)), ModifierProtocolInstaller.allClientInstallers);
            this.eventLoopGroup = new NioEventLoopGroup();
            this.handshakeClientInitializer = new HandshakeClientInitializer(applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, nettyPipelineBuilderFactory, (Duration) config.get(CausalClusteringSettings.handshake_timeout), NettyInstalledProtocolsIT.logProvider, NettyInstalledProtocolsIT.logProvider);
            this.bootstrap = new Bootstrap().group(this.eventLoopGroup).channel(NioSocketChannel.class).handler(this.handshakeClientInitializer);
        }

        void connect(int i) {
            this.channel = this.bootstrap.connect("localhost", i).syncUninterruptibly().channel();
        }

        void disconnect() {
            if (this.channel != null) {
                this.channel.close().syncUninterruptibly();
                this.eventLoopGroup.shutdownGracefully(0L, 10L, TimeUnit.SECONDS).syncUninterruptibly();
            }
        }

        ChannelFuture send(Object obj) {
            return this.channel.writeAndFlush(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT$MessageMatcher.class */
    public class MessageMatcher extends BaseMatcher<Object> {
        private final RaftMessages.ClusterIdAwareMessage<? extends RaftMessages.RaftMessage> expected;

        MessageMatcher(RaftMessages.ClusterIdAwareMessage<? extends RaftMessages.RaftMessage> clusterIdAwareMessage) {
            this.expected = clusterIdAwareMessage;
        }

        public boolean matches(Object obj) {
            if (!(obj instanceof RaftMessages.ClusterIdAwareMessage)) {
                return false;
            }
            RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = (RaftMessages.ClusterIdAwareMessage) obj;
            return clusterIdAwareMessage.clusterId().equals(this.expected.clusterId()) && clusterIdAwareMessage.message().equals(this.expected.message());
        }

        public void describeTo(Description description) {
            description.appendText("Cluster ID ").appendValue(this.expected.clusterId()).appendText(" message ").appendValue(this.expected.message());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT$Parameters.class */
    public static class Parameters {
        final String name;
        final ApplicationSupportedProtocols applicationSupportedProtocol;
        final Collection<ModifierSupportedProtocols> modifierSupportedProtocols;

        Parameters(String str, ApplicationSupportedProtocols applicationSupportedProtocols, Collection<ModifierSupportedProtocols> collection) {
            this.name = str;
            this.applicationSupportedProtocol = applicationSupportedProtocols;
            this.modifierSupportedProtocols = collection;
        }

        public String toString() {
            return this.name;
        }
    }

    /* loaded from: input_file:org/neo4j/causalclustering/protocol/handshake/NettyInstalledProtocolsIT$Server.class */
    static class Server {
        private Channel channel;
        private NioEventLoopGroup eventLoopGroup;
        private NettyPipelineBuilderFactory pipelineBuilderFactory;
        private final List<Object> received = new CopyOnWriteArrayList();
        ChannelInboundHandler nettyHandler = new SimpleChannelInboundHandler<Object>() { // from class: org.neo4j.causalclustering.protocol.handshake.NettyInstalledProtocolsIT.Server.1
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) {
                Server.this.received.add(obj);
            }
        };

        Server(NettyPipelineBuilderFactory nettyPipelineBuilderFactory) {
            this.pipelineBuilderFactory = nettyPipelineBuilderFactory;
        }

        void start(ApplicationProtocolRepository applicationProtocolRepository, ModifierProtocolRepository modifierProtocolRepository) {
            ProtocolInstallerRepository protocolInstallerRepository = new ProtocolInstallerRepository(Arrays.asList(new RaftProtocolServerInstallerV1.Factory(this.nettyHandler, this.pipelineBuilderFactory, NettyInstalledProtocolsIT.logProvider), new RaftProtocolServerInstallerV2.Factory(this.nettyHandler, this.pipelineBuilderFactory, NettyInstalledProtocolsIT.logProvider)), ModifierProtocolInstaller.allServerInstallers);
            this.eventLoopGroup = new NioEventLoopGroup();
            this.channel = new ServerBootstrap().group(this.eventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_REUSEADDR, true).localAddress(PortAuthority.allocatePort()).childHandler(new HandshakeServerInitializer(applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, this.pipelineBuilderFactory, NettyInstalledProtocolsIT.logProvider).asChannelInitializer()).bind().syncUninterruptibly().channel();
        }

        void stop() {
            this.channel.close().syncUninterruptibly();
            this.eventLoopGroup.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
        }

        int port() {
            return ((InetSocketAddress) this.channel.localAddress()).getPort();
        }

        public Collection<Object> received() {
            return this.received;
        }
    }

    public NettyInstalledProtocolsIT(Parameters parameters) {
        this.parameters = parameters;
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Parameters> data() {
        return (Collection) Stream.concat(Stream.of(Optional.empty()), Stream.of((Object[]) Protocol.ModifierProtocols.values()).map((v0) -> {
            return Optional.of(v0);
        })).flatMap(optional -> {
            return Stream.of((Object[]) new Parameters[]{raft1WithCompressionModifier(optional), raft2WithCompressionModifiers(optional)});
        }).collect(Collectors.toList());
    }

    private static Parameters raft1WithCompressionModifier(Optional<Protocol.ModifierProtocol> optional) {
        return new Parameters("Raft 1, modifiers: " + optional, new ApplicationSupportedProtocols(Protocol.ApplicationProtocolCategory.RAFT, Collections.singletonList(Protocol.ApplicationProtocols.RAFT_1.implementation())), Collections.singletonList(new ModifierSupportedProtocols(Protocol.ModifierProtocolCategory.COMPRESSION, (List) Streams.ofOptional(optional).map((v0) -> {
            return v0.implementation();
        }).collect(Collectors.toList()))));
    }

    private static Parameters raft2WithCompressionModifiers(Optional<Protocol.ModifierProtocol> optional) {
        return new Parameters("Raft 2, modifiers: " + optional, new ApplicationSupportedProtocols(Protocol.ApplicationProtocolCategory.RAFT, Collections.singletonList(Protocol.ApplicationProtocols.RAFT_2.implementation())), Collections.singletonList(new ModifierSupportedProtocols(Protocol.ModifierProtocolCategory.COMPRESSION, (List) Streams.ofOptional(optional).map((v0) -> {
            return v0.implementation();
        }).collect(Collectors.toList()))));
    }

    @Test
    public void shouldSuccessfullySendAndReceiveAMessage() throws Throwable {
        RaftMessages.Heartbeat heartbeat = new RaftMessages.Heartbeat(new MemberId(UUID.randomUUID()), 1L, 2L, 3L);
        RaftMessages.ClusterIdAwareMessage<? extends RaftMessages.RaftMessage> of = RaftMessages.ClusterIdAwareMessage.of(new ClusterId(UUID.randomUUID()), heartbeat);
        this.client.send(of).syncUninterruptibly();
        Assert.assertEventually(collection -> {
            return String.format("Received messages %s should contain message decorating %s", collection, heartbeat);
        }, () -> {
            return this.server.received();
        }, Matchers.contains(messageMatches(of)), 10L, TimeUnit.SECONDS);
    }

    @Before
    public void setUp() {
        ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository(Protocol.ApplicationProtocols.values(), this.parameters.applicationSupportedProtocol);
        ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository(Protocol.ModifierProtocols.values(), this.parameters.modifierSupportedProtocols);
        NettyPipelineBuilderFactory nettyPipelineBuilderFactory = new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER);
        NettyPipelineBuilderFactory nettyPipelineBuilderFactory2 = new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER);
        this.server = new Server(nettyPipelineBuilderFactory);
        this.server.start(applicationProtocolRepository, modifierProtocolRepository);
        this.client = new Client(applicationProtocolRepository, modifierProtocolRepository, nettyPipelineBuilderFactory2, Config.builder().withSetting(CausalClusteringSettings.handshake_timeout, "10s").build());
        this.client.connect(this.server.port());
    }

    @After
    public void tearDown() {
        this.client.disconnect();
        this.server.stop();
    }

    private Matcher<Object> messageMatches(RaftMessages.ClusterIdAwareMessage<? extends RaftMessages.RaftMessage> clusterIdAwareMessage) {
        return new MessageMatcher(clusterIdAwareMessage);
    }
}
