/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstallerV1;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstallerV1;
import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstallerV2;
import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstallerV2;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.messaging.SenderService;
import org.neo4j.causalclustering.net.ChildInitializer;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;

@RunWith(value=Parameterized.class)
public class SenderServiceIT {
    private final LogProvider logProvider = NullLogProvider.getInstance();
    private final ApplicationSupportedProtocols supportedApplicationProtocol = new ApplicationSupportedProtocols((Protocol.Category)Protocol.ApplicationProtocolCategory.RAFT, Arrays.asList(Protocol.ApplicationProtocols.RAFT_1.implementation(), Protocol.ApplicationProtocols.RAFT_2.implementation()));
    private final Collection<ModifierSupportedProtocols> supportedModifierProtocols = Collections.emptyList();
    private final ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository((Protocol.ApplicationProtocol[])Protocol.ApplicationProtocols.values(), this.supportedApplicationProtocol);
    private final ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository((Protocol.ModifierProtocol[])Protocol.ModifierProtocols.values(), this.supportedModifierProtocols);
    @Parameterized.Parameter
    public boolean blocking;
    @Parameterized.Parameter(value=1)
    public Protocol.ApplicationProtocols clientProtocol;

    @Parameterized.Parameters(name="blocking={0} protocol={1}")
    public static Iterable<Object[]> params() {
        return SenderServiceIT.clientRepositories().stream().flatMap(r -> Stream.of({true, r}, {false, r})).collect(Collectors.toList());
    }

    private static Collection<Protocol.ApplicationProtocols> clientRepositories() {
        return Arrays.asList(Protocol.ApplicationProtocols.RAFT_1, Protocol.ApplicationProtocols.RAFT_2);
    }

    @Test
    public void shouldSendAndReceive() throws Throwable {
        int port = PortAuthority.allocatePort();
        final Semaphore messageReceived = new Semaphore(0);
        ChannelInboundHandlerAdapter nettyHandler = new ChannelInboundHandlerAdapter(){

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                messageReceived.release();
            }
        };
        Server raftServer = this.raftServer((ChannelInboundHandler)nettyHandler, port);
        raftServer.start();
        SenderService sender = this.raftSender();
        sender.start();
        AdvertisedSocketAddress to = new AdvertisedSocketAddress("localhost", port);
        MemberId memberId = new MemberId(UUID.randomUUID());
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        RaftMessages.NewEntry.Request newEntryMessage = new RaftMessages.NewEntry.Request(memberId, (ReplicatedContent)new MemberIdSet(Iterators.asSet((Object[])new MemberId[]{memberId})));
        RaftMessages.ClusterIdAwareMessage message = RaftMessages.ClusterIdAwareMessage.of((ClusterId)clusterId, (RaftMessages.RaftMessage)newEntryMessage);
        sender.send(to, (Message)message, this.blocking);
        Assert.assertTrue((boolean)messageReceived.tryAcquire(15L, TimeUnit.SECONDS));
        sender.stop();
        raftServer.stop();
    }

    private Server raftServer(ChannelInboundHandler nettyHandler, int port) {
        NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER);
        RaftProtocolServerInstallerV1.Factory factoryV1 = new RaftProtocolServerInstallerV1.Factory(nettyHandler, pipelineFactory, this.logProvider);
        RaftProtocolServerInstallerV2.Factory factoryV2 = new RaftProtocolServerInstallerV2.Factory(nettyHandler, pipelineFactory, this.logProvider);
        ProtocolInstallerRepository installer = new ProtocolInstallerRepository(Arrays.asList(factoryV1, factoryV2), (Collection)ModifierProtocolInstaller.allServerInstallers);
        HandshakeServerInitializer channelInitializer = new HandshakeServerInitializer(this.applicationProtocolRepository, this.modifierProtocolRepository, installer, pipelineFactory, this.logProvider);
        ListenSocketAddress listenAddress = new ListenSocketAddress("localhost", port);
        return new Server((ChildInitializer)channelInitializer, null, this.logProvider, this.logProvider, listenAddress, "raft-server");
    }

    private SenderService raftSender() {
        NettyPipelineBuilderFactory pipelineFactory = new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER);
        RaftProtocolClientInstallerV1.Factory factoryV1 = new RaftProtocolClientInstallerV1.Factory(pipelineFactory, this.logProvider);
        RaftProtocolClientInstallerV2.Factory factoryV2 = new RaftProtocolClientInstallerV2.Factory(pipelineFactory, this.logProvider);
        ProtocolInstallerRepository protocolInstaller = new ProtocolInstallerRepository(Arrays.asList(factoryV1, factoryV2), (Collection)ModifierProtocolInstaller.allClientInstallers);
        HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer(this.clientRepository(), this.modifierProtocolRepository, protocolInstaller, pipelineFactory, Duration.ofSeconds(5L), this.logProvider, this.logProvider);
        return new SenderService((ChannelInitializer)channelInitializer, this.logProvider);
    }

    private ApplicationProtocolRepository clientRepository() {
        return new ApplicationProtocolRepository((Protocol.ApplicationProtocol[])new Protocol.ApplicationProtocols[]{Protocol.ApplicationProtocols.RAFT_2}, new ApplicationSupportedProtocols((Protocol.Category)Protocol.ApplicationProtocolCategory.RAFT, Collections.emptyList()));
    }
}

