/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core.server;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedString;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.server.BatchingMessageHandler;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.ClusterIdentity;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

public class BatchingMessageHandlerTest {
    private static final int MAX_BATCH = 16;
    private static final int QUEUE_SIZE = 64;
    private ClusterIdentity clusterIdentity = (ClusterIdentity)Mockito.mock(ClusterIdentity.class);
    private Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> raftStateMachine = (Inbound.MessageHandler)Mockito.mock(Inbound.MessageHandler.class);
    private ClusterId localClusterId = new ClusterId(UUID.randomUUID());

    @Before
    public void setup() {
        Mockito.when((Object)this.clusterIdentity.clusterId()).thenReturn((Object)this.localClusterId);
    }

    @Test
    public void shouldInvokeInnerHandlerWhenRun() throws Exception {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.raftStateMachine, 64, 16, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        batchHandler.handle(message);
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.raftStateMachine});
        batchHandler.run();
        ((Inbound.MessageHandler)Mockito.verify(this.raftStateMachine)).handle((Message)message);
    }

    @Test
    public void shouldInvokeHandlerOnQueuedMessage() throws Exception {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.raftStateMachine, 64, 16, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<?> future = executor.submit((Runnable)batchHandler);
        Thread.sleep(50L);
        batchHandler.handle(message);
        future.get();
        ((Inbound.MessageHandler)Mockito.verify(this.raftStateMachine)).handle((Message)message);
    }

    @Test
    public void shouldBatchRequests() throws Exception {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.raftStateMachine, 64, 16, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString contentA = new ReplicatedString("A");
        ReplicatedString contentB = new ReplicatedString("B");
        RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentA);
        RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentB);
        batchHandler.handle(new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)messageA));
        batchHandler.handle(new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)messageB));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.raftStateMachine});
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add((ReplicatedContent)contentA);
        batchRequest.add((ReplicatedContent)contentB);
        ((Inbound.MessageHandler)Mockito.verify(this.raftStateMachine)).handle((Message)new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)batchRequest));
    }

    @Test
    public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Exception {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.raftStateMachine, 64, 16, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString contentA = new ReplicatedString("A");
        ReplicatedString contentC = new ReplicatedString("C");
        RaftMessages.ClusterIdAwareMessage messageA = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentA));
        RaftMessages.ClusterIdAwareMessage messageB = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.Heartbeat(null, 0L, 0L, 0L));
        RaftMessages.ClusterIdAwareMessage messageC = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentC));
        RaftMessages.ClusterIdAwareMessage messageD = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.Heartbeat(null, 1L, 1L, 1L));
        batchHandler.handle(messageA);
        batchHandler.handle(messageB);
        batchHandler.handle(messageC);
        batchHandler.handle(messageD);
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.raftStateMachine});
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add((ReplicatedContent)contentA);
        batchRequest.add((ReplicatedContent)contentC);
        ((Inbound.MessageHandler)Mockito.verify(this.raftStateMachine)).handle((Message)new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)batchRequest));
        ((Inbound.MessageHandler)Mockito.verify(this.raftStateMachine)).handle((Message)messageB);
        ((Inbound.MessageHandler)Mockito.verify(this.raftStateMachine)).handle((Message)messageD);
    }

    @Test
    public void shouldDropMessagesAfterBeingStopped() throws Exception {
        AssertableLogProvider logProvider = new AssertableLogProvider();
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.raftStateMachine, 64, 16, (LogProvider)logProvider);
        RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        batchHandler.stop();
        batchHandler.handle(message);
        batchHandler.run();
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.raftStateMachine});
        logProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BatchingMessageHandler.class).debug("This handler has been stopped, dropping the message: %s", new Object[]{message})});
    }

    @Test(timeout=5000L)
    public void shouldGiveUpAddingMessagesInTheQueueIfTheHandlerHasBeenStopped() throws Exception {
        int queueSize = 1;
        final BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.raftStateMachine, queueSize, 16, (LogProvider)NullLogProvider.getInstance());
        final RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        batchHandler.handle(message);
        final CountDownLatch latch = new CountDownLatch(1);
        Thread thread = new Thread(){

            @Override
            public void run() {
                latch.countDown();
                batchHandler.handle(message);
            }
        };
        thread.start();
        latch.await();
        batchHandler.stop();
        thread.join();
    }
}

