/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.causalclustering.core.BatchingMessageHandler;
import org.neo4j.causalclustering.core.BoundedPriorityQueue;
import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedString;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.helpers.ArrayUtil;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

public class BatchingMessageHandlerTest {
    private static final BoundedPriorityQueue.Config IN_QUEUE_CONFIG = new BoundedPriorityQueue.Config(64, 1024L);
    private static final BatchingMessageHandler.Config BATCH_CONFIG = new BatchingMessageHandler.Config(16, 256L);
    private final Instant now = Instant.now();
    private LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> downstreamHandler = (LifecycleMessageHandler)Mockito.mock(LifecycleMessageHandler.class);
    private ClusterId localClusterId = new ClusterId(UUID.randomUUID());
    private ContinuousJob mockJob = (ContinuousJob)Mockito.mock(ContinuousJob.class);
    private Function<Runnable, ContinuousJob> jobSchedulerFactory = ignored -> this.mockJob;
    private ExecutorService executor;
    private MemberId leader = new MemberId(UUID.randomUUID());

    @Before
    public void before() {
        this.executor = Executors.newCachedThreadPool();
    }

    @After
    public void after() throws InterruptedException {
        this.executor.shutdown();
        this.executor.awaitTermination(60L, TimeUnit.SECONDS);
    }

    @Test
    public void shouldInvokeInnerHandlerWhenRun() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request(null, this.content("dummy"));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)message));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest expected = new RaftMessages.NewEntry.BatchRequest(Collections.singletonList(new ReplicatedString("dummy")));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)expected));
    }

    @Test
    public void shouldInvokeHandlerOnQueuedMessage() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString content = new ReplicatedString("dummy");
        RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)content);
        Future<?> future = this.executor.submit((Runnable)batchHandler);
        Thread.sleep(50L);
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)message));
        future.get();
        RaftMessages.NewEntry.BatchRequest expected = new RaftMessages.NewEntry.BatchRequest(Collections.singletonList(content));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)expected));
    }

    @Test
    public void shouldBatchRequests() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString contentA = new ReplicatedString("A");
        ReplicatedString contentB = new ReplicatedString("B");
        RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentA);
        RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentB);
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)messageA));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)messageB));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest expected = new RaftMessages.NewEntry.BatchRequest(Arrays.asList(contentA, contentB));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)expected));
    }

    @Test
    public void shouldBatchUsingReceivedInstantOfFirstReceivedMessage() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString content = new ReplicatedString("A");
        RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)content);
        Instant firstReceived = Instant.ofEpochMilli(1L);
        Instant secondReceived = firstReceived.plusMillis(1L);
        batchHandler.handle(this.wrap(firstReceived, (RaftMessages.RaftMessage)messageA));
        batchHandler.handle(this.wrap(secondReceived, (RaftMessages.RaftMessage)messageA));
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(Arrays.asList(content, content));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle(this.wrap(firstReceived, (RaftMessages.RaftMessage)batchRequest));
    }

    @Test
    public void shouldBatchNewEntriesAndHandleOtherMessagesFirst() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString contentA = new ReplicatedString("A");
        ReplicatedString contentC = new ReplicatedString("C");
        RaftMessages.NewEntry.Request newEntryA = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentA);
        RaftMessages.Heartbeat heartbeatA = new RaftMessages.Heartbeat(null, 0L, 0L, 0L);
        RaftMessages.NewEntry.Request newEntryB = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentC);
        RaftMessages.Heartbeat heartbeatB = new RaftMessages.Heartbeat(null, 1L, 1L, 1L);
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)newEntryA));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)heartbeatA));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)newEntryB));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)heartbeatB));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        batchHandler.run();
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(Arrays.asList(contentA, contentC));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)heartbeatA));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)heartbeatB));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)batchRequest));
    }

    @Test
    public void shouldBatchSingleEntryAppendEntries() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        long leaderTerm = 1L;
        long prevLogIndex = -1L;
        long prevLogTerm = -1L;
        long leaderCommit = 0L;
        RaftLogEntry entryA = new RaftLogEntry(0L, this.content("A"));
        RaftLogEntry entryB = new RaftLogEntry(0L, this.content("B"));
        RaftMessages.AppendEntries.Request appendA = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, prevLogIndex, prevLogTerm, new RaftLogEntry[]{entryA}, leaderCommit);
        RaftMessages.AppendEntries.Request appendB = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, prevLogIndex + 1L, 0L, new RaftLogEntry[]{entryB}, leaderCommit);
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)appendA));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)appendB));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        RaftMessages.AppendEntries.Request expected = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, prevLogIndex, prevLogTerm, new RaftLogEntry[]{entryA, entryB}, leaderCommit);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)expected));
    }

    @Test
    public void shouldBatchMultipleEntryAppendEntries() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        long leaderTerm = 1L;
        long prevLogIndex = -1L;
        long prevLogTerm = -1L;
        long leaderCommit = 0L;
        Object[] entriesA = this.entries(0L, 0, 2);
        RaftLogEntry[] entriesB = this.entries(1L, 3, 3);
        Object[] entriesC = this.entries(2L, 4, 8);
        Object[] entriesD = this.entries(3L, 9, 15);
        RaftMessages.AppendEntries.Request appendA = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, prevLogIndex, prevLogTerm, (RaftLogEntry[])entriesA, leaderCommit);
        prevLogTerm = ((RaftLogEntry)ArrayUtil.lastOf((Object[])appendA.entries())).term();
        RaftMessages.AppendEntries.Request appendB = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, prevLogIndex += (long)appendA.entries().length, prevLogTerm, entriesB, leaderCommit += 2L);
        prevLogTerm = ((RaftLogEntry)ArrayUtil.lastOf((Object[])appendB.entries())).term();
        RaftMessages.AppendEntries.Request appendC = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, prevLogIndex += (long)appendB.entries().length, prevLogTerm, (RaftLogEntry[])ArrayUtil.concat((Object[])entriesC, (Object[])entriesD), leaderCommit += 5L);
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)appendA));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)appendB));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)appendC));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        RaftMessages.AppendEntries.Request expected = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, -1L, -1L, (RaftLogEntry[])ArrayUtil.concatArrays((Object[])entriesA, (Object[][])new RaftLogEntry[][]{entriesB, entriesC, entriesD}), leaderCommit);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)expected));
    }

    @Test
    public void shouldNotBatchAppendEntriesDifferentLeaderTerms() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        long leaderTerm = 1L;
        long prevLogIndex = -1L;
        long prevLogTerm = -1L;
        long leaderCommit = 0L;
        RaftLogEntry[] entriesA = this.entries(0L, 0, 2);
        RaftLogEntry[] entriesB = this.entries(1L, 3, 3);
        RaftMessages.AppendEntries.Request appendA = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm, prevLogIndex, prevLogTerm, entriesA, leaderCommit);
        prevLogTerm = ((RaftLogEntry)ArrayUtil.lastOf((Object[])appendA.entries())).term();
        RaftMessages.AppendEntries.Request appendB = new RaftMessages.AppendEntries.Request(this.leader, leaderTerm + 1L, prevLogIndex += (long)appendA.entries().length, prevLogTerm, entriesB, leaderCommit);
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)appendA));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)appendB));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        batchHandler.run();
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)appendA));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)appendB));
    }

    @Test
    public void shouldPrioritiseCorrectly() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.NewEntry.Request newEntry = new RaftMessages.NewEntry.Request(null, this.content(""));
        RaftMessages.AppendEntries.Request append = new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, this.entries(0L, 0, 0), 0L);
        RaftMessages.AppendEntries.Request emptyAppend = new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, RaftLogEntry.empty, 0L);
        RaftMessages.Heartbeat heartbeat = new RaftMessages.Heartbeat(null, 0L, 0L, 0L);
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)newEntry));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)append));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)heartbeat));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)emptyAppend));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        batchHandler.run();
        batchHandler.run();
        batchHandler.run();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.downstreamHandler});
        ((LifecycleMessageHandler)inOrder.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)heartbeat));
        ((LifecycleMessageHandler)inOrder.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)emptyAppend));
        ((LifecycleMessageHandler)inOrder.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)append));
        ((LifecycleMessageHandler)inOrder.verify(this.downstreamHandler)).handle((Message)this.wrap((RaftMessages.RaftMessage)new RaftMessages.NewEntry.BatchRequest(Collections.singletonList(this.content("")))));
    }

    @Test
    public void shouldDropMessagesAfterBeingStopped() throws Throwable {
        AssertableLogProvider logProvider = new AssertableLogProvider();
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)logProvider);
        RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request(null, null);
        batchHandler.stop();
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)message));
        batchHandler.run();
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler, (VerificationMode)Mockito.never())).handle((Message)ArgumentMatchers.any(RaftMessages.ReceivedInstantClusterIdAwareMessage.class));
        logProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BatchingMessageHandler.class).debug("This handler has been stopped, dropping the message: %s", new Object[]{this.wrap((RaftMessages.RaftMessage)message)})});
    }

    @Test(timeout=5000L)
    public void shouldGiveUpAddingMessagesInTheQueueIfTheHandlerHasBeenStopped() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, new BoundedPriorityQueue.Config(1, 1, 1024L), BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)new ReplicatedString("dummy"));
        batchHandler.handle(this.wrap((RaftMessages.RaftMessage)message));
        CountDownLatch latch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            latch.countDown();
            batchHandler.handle(this.wrap((RaftMessages.RaftMessage)message));
        });
        thread.start();
        latch.await();
        batchHandler.stop();
        thread.join();
    }

    @Test
    public void shouldDelegateStart() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        batchHandler.start(clusterId);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).start(clusterId);
    }

    @Test
    public void shouldDelegateStop() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        batchHandler.stop();
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).stop();
    }

    @Test
    public void shouldStartJob() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        batchHandler.start(clusterId);
        ((ContinuousJob)Mockito.verify((Object)this.mockJob)).start();
    }

    @Test
    public void shouldStopJob() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        batchHandler.stop();
        ((ContinuousJob)Mockito.verify((Object)this.mockJob)).stop();
    }

    private RaftMessages.ReceivedInstantClusterIdAwareMessage wrap(RaftMessages.RaftMessage message) {
        return this.wrap(this.now, message);
    }

    private RaftMessages.ReceivedInstantClusterIdAwareMessage<?> wrap(Instant instant, RaftMessages.RaftMessage message) {
        return RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)instant, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)message);
    }

    private ReplicatedContent content(String content) {
        return new ReplicatedString(content);
    }

    private RaftLogEntry[] entries(long term, int min, int max) {
        RaftLogEntry[] entries = new RaftLogEntry[max - min + 1];
        for (int i = min; i <= max; ++i) {
            entries[i - min] = new RaftLogEntry(term, (ReplicatedContent)new ReplicatedString(String.valueOf(i)));
        }
        return entries;
    }
}

