/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core.replication;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedInteger;
import org.neo4j.causalclustering.core.replication.DistributedOperation;
import org.neo4j.causalclustering.core.replication.Progress;
import org.neo4j.causalclustering.core.replication.ProgressTracker;
import org.neo4j.causalclustering.core.replication.RaftReplicator;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.replication.ReplicationFailureException;
import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor;
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.availability.DatabaseAvailabilityGuard;
import org.neo4j.kernel.availability.UnavailableException;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLog;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.time.Clocks;

public class RaftReplicatorTest {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static final int DEFAULT_TIMEOUT_MS = 15000;
    private LeaderLocator leaderLocator = (LeaderLocator)Mockito.mock(LeaderLocator.class);
    private MemberId myself = new MemberId(UUID.randomUUID());
    private LeaderInfo leaderInfo = new LeaderInfo(new MemberId(UUID.randomUUID()), 1L);
    private GlobalSession session = new GlobalSession(UUID.randomUUID(), this.myself);
    private LocalSessionPool sessionPool = new LocalSessionPool(this.session);
    private TimeoutStrategy noWaitTimeoutStrategy = new ConstantTimeTimeoutStrategy(0L, TimeUnit.MILLISECONDS);
    private DatabaseAvailabilityGuard databaseAvailabilityGuard = new DatabaseAvailabilityGuard("graph.db", Clocks.systemClock(), (Log)NullLog.getInstance());

    @Test
    public void shouldSendReplicatedContentToLeader() throws Exception {
        Monitors monitors = new Monitors();
        ReplicationMonitor replicationMonitor = (ReplicationMonitor)Mockito.mock(ReplicationMonitor.class);
        monitors.addMonitorListener((Object)replicationMonitor, new String[0]);
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, monitors);
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, false);
        replicatingThread.start();
        org.neo4j.test.assertion.Assert.assertEventually((String)"making progress", () -> capturedProgress.last, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.equalTo(null)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        capturedProgress.last.setReplicated();
        replicatingThread.join(15000L);
        Assert.assertEquals((Object)this.leaderInfo.memberId(), (Object)((CapturingOutbound)outbound).lastTo);
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).startReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.atLeast((int)1))).replicationAttempt();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).successfulReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.never())).failedReplication((Throwable)ArgumentMatchers.any());
    }

    @Test
    public void shouldResendAfterTimeout() throws Exception {
        Monitors monitors = new Monitors();
        ReplicationMonitor replicationMonitor = (ReplicationMonitor)Mockito.mock(ReplicationMonitor.class);
        monitors.addMonitorListener((Object)replicationMonitor, new String[0]);
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, monitors);
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, false);
        replicatingThread.start();
        org.neo4j.test.assertion.Assert.assertEventually((String)"send count", () -> ((CapturingOutbound)outbound).count, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(2)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        capturedProgress.last.setReplicated();
        replicatingThread.join(15000L);
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).startReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.atLeast((int)2))).replicationAttempt();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).successfulReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.never())).failedReplication((Throwable)ArgumentMatchers.any());
    }

    @Test
    public void shouldReleaseSessionWhenFinished() throws Exception {
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, true);
        replicatingThread.start();
        org.neo4j.test.assertion.Assert.assertEventually((String)"making progress", () -> capturedProgress.last, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.equalTo(null)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        Assert.assertEquals((long)1L, (long)this.sessionPool.openSessionCount());
        capturedProgress.last.setReplicated();
        capturedProgress.last.futureResult().complete(5);
        replicatingThread.join(15000L);
        Assert.assertEquals((long)0L, (long)this.sessionPool.openSessionCount());
    }

    @Test
    public void stopReplicationOnShutdown() throws InterruptedException {
        Monitors monitors = new Monitors();
        ReplicationMonitor replicationMonitor = (ReplicationMonitor)Mockito.mock(ReplicationMonitor.class);
        monitors.addMonitorListener((Object)replicationMonitor, new String[0]);
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, monitors);
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, true);
        replicatingThread.start();
        this.databaseAvailabilityGuard.shutdown();
        replicatingThread.join();
        Assert.assertThat((Object)replicatingThread.getReplicationException().getCause(), (Matcher)Matchers.instanceOf(UnavailableException.class));
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).startReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.atLeast((int)1))).replicationAttempt();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.never())).successfulReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).failedReplication((Throwable)ArgumentMatchers.any());
    }

    @Test
    public void stopReplicationWhenUnavailable() throws InterruptedException {
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, true);
        replicatingThread.start();
        this.databaseAvailabilityGuard.require(() -> "Database not unavailable");
        replicatingThread.join();
        Assert.assertThat((Object)replicatingThread.getReplicationException().getCause(), (Matcher)Matchers.instanceOf(UnavailableException.class));
    }

    @Test
    public void shouldFailIfNoLeaderIsAvailable() {
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        try {
            ReplicatedInteger content = ReplicatedInteger.valueOf(5);
            replicator.replicate((ReplicatedContent)content, true);
            Assert.fail((String)"should have thrown");
        }
        catch (ReplicationFailureException replicationFailureException) {
            // empty catch block
        }
    }

    @Test
    public void shouldListenToLeaderUpdates() throws ReplicationFailureException {
        OneProgressTracker oneProgressTracker = new OneProgressTracker();
        oneProgressTracker.last.setReplicated();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, oneProgressTracker, new Monitors());
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        LeaderInfo lastLeader = this.leaderInfo;
        replicator.onLeaderSwitch(lastLeader);
        replicator.replicate((ReplicatedContent)content, false);
        Assert.assertEquals((Object)((CapturingOutbound)outbound).lastTo, (Object)lastLeader.memberId());
        lastLeader = new LeaderInfo(new MemberId(UUID.randomUUID()), 1L);
        replicator.onLeaderSwitch(lastLeader);
        replicator.replicate((ReplicatedContent)content, false);
        Assert.assertEquals((Object)((CapturingOutbound)outbound).lastTo, (Object)lastLeader.memberId());
    }

    @Test
    public void shouldSuccefulltSendIfLeaderIsLostAndFound() throws InterruptedException {
        OneProgressTracker capturedProgress = new OneProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, false);
        replicatingThread.start();
        org.neo4j.test.assertion.Assert.assertEventually((String)"send count", () -> ((CapturingOutbound)outbound).count, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(1)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        replicator.onLeaderSwitch(new LeaderInfo(null, 1L));
        capturedProgress.last.setReplicated();
        replicator.onLeaderSwitch(this.leaderInfo);
        replicatingThread.join(15000L);
    }

    private RaftReplicator getReplicator(CapturingOutbound<RaftMessages.RaftMessage> outbound, ProgressTracker progressTracker, Monitors monitors) {
        return new RaftReplicator(this.leaderLocator, this.myself, outbound, this.sessionPool, progressTracker, this.noWaitTimeoutStrategy, 10L, (AvailabilityGuard)this.databaseAvailabilityGuard, (LogProvider)NullLogProvider.getInstance(), monitors);
    }

    private ReplicatingThread replicatingThread(RaftReplicator replicator, ReplicatedInteger content, boolean trackResult) {
        return new ReplicatingThread(replicator, content, trackResult);
    }

    private static class CapturingOutbound<MESSAGE extends Message>
    implements Outbound<MemberId, MESSAGE> {
        private MemberId lastTo;
        private int count;

        private CapturingOutbound() {
        }

        public void send(MemberId to, MESSAGE message, boolean block) {
            this.lastTo = to;
            ++this.count;
        }
    }

    private abstract class ProgressTrackerAdaptor
    implements ProgressTracker {
        protected Progress last;

        private ProgressTrackerAdaptor() {
        }

        public void trackReplication(DistributedOperation operation) {
            throw new UnsupportedOperationException();
        }

        public void trackResult(DistributedOperation operation, Result result) {
            throw new UnsupportedOperationException();
        }

        public void abort(DistributedOperation operation) {
            throw new UnsupportedOperationException();
        }

        public void triggerReplicationEvent() {
        }

        public int inProgressCount() {
            throw new UnsupportedOperationException();
        }
    }

    private class CapturingProgressTracker
    extends ProgressTrackerAdaptor {
        private CapturingProgressTracker() {
        }

        public Progress start(DistributedOperation operation) {
            this.last = new Progress();
            return this.last;
        }
    }

    private class OneProgressTracker
    extends ProgressTrackerAdaptor {
        OneProgressTracker() {
            this.last = new Progress();
        }

        public Progress start(DistributedOperation operation) {
            return this.last;
        }
    }

    private class ReplicatingThread
    extends Thread {
        private final RaftReplicator replicator;
        private final ReplicatedInteger content;
        private final boolean trackResult;
        private volatile Exception replicationException;

        ReplicatingThread(RaftReplicator replicator, ReplicatedInteger content, boolean trackResult) {
            this.replicator = replicator;
            this.content = content;
            this.trackResult = trackResult;
        }

        @Override
        public void run() {
            block4: {
                try {
                    Future futureResult = this.replicator.replicate((ReplicatedContent)this.content, this.trackResult);
                    if (!this.trackResult) break block4;
                    try {
                        futureResult.get();
                    }
                    catch (ExecutionException e) {
                        this.replicationException = e;
                        throw new IllegalStateException();
                    }
                }
                catch (Exception e) {
                    this.replicationException = e;
                }
            }
        }

        Exception getReplicationException() {
            return this.replicationException;
        }
    }
}

