/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core.replication;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedInteger;
import org.neo4j.causalclustering.core.replication.DistributedOperation;
import org.neo4j.causalclustering.core.replication.Progress;
import org.neo4j.causalclustering.core.replication.ProgressTracker;
import org.neo4j.causalclustering.core.replication.RaftReplicator;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.replication.ReplicationFailureException;
import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor;
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.io.layout.DatabaseLayout;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.availability.DatabaseAvailabilityGuard;
import org.neo4j.kernel.availability.UnavailableException;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLog;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.assertion.Assert;
import org.neo4j.time.Clocks;

class RaftReplicatorTest {
    private static final int DEFAULT_TIMEOUT_MS = 15000;
    private LeaderLocator leaderLocator = (LeaderLocator)Mockito.mock(LeaderLocator.class);
    private MemberId myself = new MemberId(UUID.randomUUID());
    private LeaderInfo leaderInfo = new LeaderInfo(new MemberId(UUID.randomUUID()), 1L);
    private GlobalSession session = new GlobalSession(UUID.randomUUID(), this.myself);
    private LocalSessionPool sessionPool = new LocalSessionPool(this.session);
    private TimeoutStrategy noWaitTimeoutStrategy = new ConstantTimeTimeoutStrategy(0L, TimeUnit.MILLISECONDS);
    private DatabaseAvailabilityGuard databaseAvailabilityGuard;
    private DatabaseHealth databaseHealth;
    private LocalDatabase localDatabase;

    RaftReplicatorTest() {
    }

    @BeforeEach
    void setUp() throws IOException {
        this.databaseAvailabilityGuard = new DatabaseAvailabilityGuard("graph.db", Clocks.systemClock(), (Log)NullLog.getInstance());
        this.databaseHealth = new DatabaseHealth((DatabasePanicEventGenerator)Mockito.mock(DatabasePanicEventGenerator.class), (Log)NullLog.getInstance());
        this.localDatabase = StubLocalDatabase.create(() -> this.databaseHealth, (AvailabilityGuard)this.databaseAvailabilityGuard);
        this.localDatabase.start();
    }

    @Test
    void shouldSendReplicatedContentToLeader() throws Exception {
        Monitors monitors = new Monitors();
        ReplicationMonitor replicationMonitor = (ReplicationMonitor)Mockito.mock(ReplicationMonitor.class);
        monitors.addMonitorListener((Object)replicationMonitor, new String[0]);
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, monitors);
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, false);
        replicatingThread.start();
        Assert.assertEventually((String)"making progress", () -> capturedProgress.last, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.equalTo(null)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        capturedProgress.last.setReplicated();
        replicatingThread.join(15000L);
        Assertions.assertEquals((Object)this.leaderInfo.memberId(), (Object)((CapturingOutbound)outbound).lastTo);
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).startReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.atLeast((int)1))).replicationAttempt();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).successfulReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.never())).failedReplication((Throwable)ArgumentMatchers.any());
    }

    @Test
    void shouldResendAfterTimeout() throws Exception {
        Monitors monitors = new Monitors();
        ReplicationMonitor replicationMonitor = (ReplicationMonitor)Mockito.mock(ReplicationMonitor.class);
        monitors.addMonitorListener((Object)replicationMonitor, new String[0]);
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, monitors);
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, false);
        replicatingThread.start();
        Assert.assertEventually((String)"send count", () -> ((CapturingOutbound)outbound).count, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(2)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        capturedProgress.last.setReplicated();
        replicatingThread.join(15000L);
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).startReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.atLeast((int)2))).replicationAttempt();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).successfulReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.never())).failedReplication((Throwable)ArgumentMatchers.any());
    }

    @Test
    void shouldReleaseSessionWhenFinished() throws Exception {
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, true);
        replicatingThread.start();
        Assert.assertEventually((String)"making progress", () -> capturedProgress.last, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.equalTo(null)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        Assertions.assertEquals((long)1L, (long)this.sessionPool.openSessionCount());
        capturedProgress.last.setReplicated();
        capturedProgress.last.futureResult().complete(5);
        replicatingThread.join(15000L);
        Assertions.assertEquals((long)0L, (long)this.sessionPool.openSessionCount());
    }

    @Test
    void stopReplicationOnShutdown() throws InterruptedException {
        Monitors monitors = new Monitors();
        ReplicationMonitor replicationMonitor = (ReplicationMonitor)Mockito.mock(ReplicationMonitor.class);
        monitors.addMonitorListener((Object)replicationMonitor, new String[0]);
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, monitors);
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, true);
        replicatingThread.start();
        this.databaseAvailabilityGuard.shutdown();
        replicatingThread.join();
        MatcherAssert.assertThat((Object)replicatingThread.getReplicationException().getCause(), (Matcher)Matchers.instanceOf(UnavailableException.class));
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).startReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.atLeast((int)1))).replicationAttempt();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.never())).successfulReplication();
        ((ReplicationMonitor)Mockito.verify((Object)replicationMonitor, (VerificationMode)Mockito.times((int)1))).failedReplication((Throwable)ArgumentMatchers.any());
    }

    @Test
    void stopReplicationWhenUnavailable() throws InterruptedException {
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, true);
        replicatingThread.start();
        this.databaseAvailabilityGuard.require(() -> "Database not unavailable");
        replicatingThread.join();
        MatcherAssert.assertThat((Object)replicatingThread.getReplicationException().getCause(), (Matcher)Matchers.instanceOf(UnavailableException.class));
    }

    @Test
    void stopReplicationWhenUnHealthy() throws InterruptedException {
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, true);
        replicatingThread.start();
        this.databaseHealth.panic((Throwable)new IllegalStateException("PANIC"));
        replicatingThread.join();
        Assertions.assertNotNull((Object)replicatingThread.getReplicationException());
    }

    @Test
    void shouldFailIfNoLeaderIsAvailable() {
        CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        Assertions.assertThrows(ReplicationFailureException.class, () -> replicator.replicate((ReplicatedContent)content, true));
    }

    @Test
    void shouldListenToLeaderUpdates() throws ReplicationFailureException {
        OneProgressTracker oneProgressTracker = new OneProgressTracker();
        oneProgressTracker.last.setReplicated();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, oneProgressTracker, new Monitors());
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        LeaderInfo lastLeader = this.leaderInfo;
        replicator.onLeaderSwitch(lastLeader);
        replicator.replicate((ReplicatedContent)content, false);
        Assertions.assertEquals((Object)((CapturingOutbound)outbound).lastTo, (Object)lastLeader.memberId());
        lastLeader = new LeaderInfo(new MemberId(UUID.randomUUID()), 1L);
        replicator.onLeaderSwitch(lastLeader);
        replicator.replicate((ReplicatedContent)content, false);
        Assertions.assertEquals((Object)((CapturingOutbound)outbound).lastTo, (Object)lastLeader.memberId());
    }

    @Test
    void shouldSuccessfullySendIfLeaderIsLostAndFound() throws InterruptedException {
        OneProgressTracker capturedProgress = new OneProgressTracker();
        CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<RaftMessages.RaftMessage>();
        RaftReplicator replicator = this.getReplicator(outbound, capturedProgress, new Monitors());
        replicator.onLeaderSwitch(this.leaderInfo);
        ReplicatedInteger content = ReplicatedInteger.valueOf(5);
        ReplicatingThread replicatingThread = this.replicatingThread(replicator, content, false);
        replicatingThread.start();
        Assert.assertEventually((String)"send count", () -> ((CapturingOutbound)outbound).count, (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(1)), (long)15000L, (TimeUnit)TimeUnit.MILLISECONDS);
        replicator.onLeaderSwitch(new LeaderInfo(null, 1L));
        capturedProgress.last.setReplicated();
        replicator.onLeaderSwitch(this.leaderInfo);
        replicatingThread.join(15000L);
    }

    private RaftReplicator getReplicator(CapturingOutbound<RaftMessages.RaftMessage> outbound, ProgressTracker progressTracker, Monitors monitors) {
        return new RaftReplicator(this.leaderLocator, this.myself, outbound, this.sessionPool, progressTracker, this.noWaitTimeoutStrategy, 10L, (AvailabilityGuard)this.databaseAvailabilityGuard, (LogProvider)NullLogProvider.getInstance(), this.localDatabase, monitors);
    }

    private ReplicatingThread replicatingThread(RaftReplicator replicator, ReplicatedInteger content, boolean trackResult) {
        return new ReplicatingThread(replicator, content, trackResult);
    }

    private static class StubLocalDatabase
    extends LocalDatabase {
        static LocalDatabase create(Supplier<DatabaseHealth> databaseHealthSupplier, AvailabilityGuard availabilityGuard) throws IOException {
            StoreFiles storeFiles = (StoreFiles)Mockito.mock(StoreFiles.class);
            Mockito.when((Object)storeFiles.readStoreId((DatabaseLayout)ArgumentMatchers.any())).thenReturn((Object)new StoreId(1L, 2L, 3L, 4L));
            DataSourceManager dataSourceManager = (DataSourceManager)Mockito.mock(DataSourceManager.class);
            return new StubLocalDatabase(storeFiles, dataSourceManager, databaseHealthSupplier, availabilityGuard);
        }

        StubLocalDatabase(StoreFiles storeFiles, DataSourceManager dataSourceManager, Supplier<DatabaseHealth> databaseHealthSupplier, AvailabilityGuard availabilityGuard) {
            super(null, storeFiles, null, dataSourceManager, databaseHealthSupplier, availabilityGuard, (LogProvider)NullLogProvider.getInstance());
        }
    }

    private static class CapturingOutbound<MESSAGE extends Message>
    implements Outbound<MemberId, MESSAGE> {
        private MemberId lastTo;
        private int count;

        private CapturingOutbound() {
        }

        public void send(MemberId to, MESSAGE message, boolean block) {
            this.lastTo = to;
            ++this.count;
        }
    }

    private abstract class ProgressTrackerAdaptor
    implements ProgressTracker {
        protected Progress last;

        private ProgressTrackerAdaptor() {
        }

        public void trackReplication(DistributedOperation operation) {
            throw new UnsupportedOperationException();
        }

        public void trackResult(DistributedOperation operation, Result result) {
            throw new UnsupportedOperationException();
        }

        public void abort(DistributedOperation operation) {
            throw new UnsupportedOperationException();
        }

        public void triggerReplicationEvent() {
        }

        public int inProgressCount() {
            throw new UnsupportedOperationException();
        }
    }

    private class CapturingProgressTracker
    extends ProgressTrackerAdaptor {
        private CapturingProgressTracker() {
        }

        public Progress start(DistributedOperation operation) {
            this.last = new Progress();
            return this.last;
        }
    }

    private class OneProgressTracker
    extends ProgressTrackerAdaptor {
        OneProgressTracker() {
            this.last = new Progress();
        }

        public Progress start(DistributedOperation operation) {
            return this.last;
        }
    }

    private class ReplicatingThread
    extends Thread {
        private final RaftReplicator replicator;
        private final ReplicatedInteger content;
        private final boolean trackResult;
        private volatile Exception replicationException;

        ReplicatingThread(RaftReplicator replicator, ReplicatedInteger content, boolean trackResult) {
            this.replicator = replicator;
            this.content = content;
            this.trackResult = trackResult;
        }

        @Override
        public void run() {
            block4: {
                try {
                    Future futureResult = this.replicator.replicate((ReplicatedContent)this.content, this.trackResult);
                    if (!this.trackResult) break block4;
                    try {
                        futureResult.get();
                    }
                    catch (ExecutionException e) {
                        this.replicationException = e;
                        throw new IllegalStateException();
                    }
                }
                catch (Exception e) {
                    this.replicationException = e;
                }
            }
        }

        Exception getReplicationException() {
            return this.replicationException;
        }
    }
}

