package org.neo4j.causalclustering.catchup.tx;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpResponseCallback;
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.core.consensus.schedule.CountingTimerService;
import org.neo4j.causalclustering.core.consensus.schedule.Timer;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelector;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.FakeClockJobScheduler;

/* loaded from: input_file:org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.class */
public class CatchupPollingProcessTest {
    private final CatchUpClient catchUpClient = (CatchUpClient) Mockito.mock(CatchUpClient.class);
    private final UpstreamDatabaseStrategySelector strategyPipeline = (UpstreamDatabaseStrategySelector) Mockito.mock(UpstreamDatabaseStrategySelector.class);
    private final MemberId coreMemberId = (MemberId) Mockito.mock(MemberId.class);
    private final TransactionIdStore idStore = (TransactionIdStore) Mockito.mock(TransactionIdStore.class);
    private final BatchingTxApplier txApplier = (BatchingTxApplier) Mockito.mock(BatchingTxApplier.class);
    private final FakeClockJobScheduler scheduler = new FakeClockJobScheduler();
    private final CountingTimerService timerService = new CountingTimerService(this.scheduler, NullLogProvider.getInstance());
    private final long txPullIntervalMillis = 100;
    private final StoreCopyProcess storeCopyProcess = (StoreCopyProcess) Mockito.mock(StoreCopyProcess.class);
    private final StoreId storeId = new StoreId(1, 2, 3, 4);
    private final LocalDatabase localDatabase = (LocalDatabase) Mockito.mock(LocalDatabase.class);
    private final TopologyService topologyService = (TopologyService) Mockito.mock(TopologyService.class);
    private final AdvertisedSocketAddress coreMemberAddress = new AdvertisedSocketAddress("hostname", 1234);
    private final Lifecycle startStopOnStoreCopy;
    private final CatchupPollingProcess txPuller;

    public CatchupPollingProcessTest() {
        Mockito.when(this.localDatabase.storeId()).thenReturn(this.storeId);
        Mockito.when(this.topologyService.findCatchupAddress(this.coreMemberId)).thenReturn(Optional.of(this.coreMemberAddress));
        this.startStopOnStoreCopy = (Lifecycle) Mockito.mock(Lifecycle.class);
        this.txPuller = new CatchupPollingProcess(NullLogProvider.getInstance(), this.localDatabase, this.startStopOnStoreCopy, this.catchUpClient, this.strategyPipeline, this.timerService, 100L, this.txApplier, new Monitors(), this.storeCopyProcess, () -> {
            return (DatabaseHealth) Mockito.mock(DatabaseHealth.class);
        }, this.topologyService);
    }

    @Before
    public void before() throws Throwable {
        Mockito.when(Long.valueOf(this.idStore.getLastCommittedTransactionId())).thenReturn(1L);
        Mockito.when(this.strategyPipeline.bestUpstreamDatabase()).thenReturn(this.coreMemberId);
    }

    @Test
    public void shouldSendPullRequestOnTick() throws Throwable {
        this.txPuller.start();
        Mockito.when(Long.valueOf(this.txApplier.lastQueuedTxId())).thenReturn(99L);
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        ((CatchUpClient) Mockito.verify(this.catchUpClient)).makeBlockingRequest((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest) ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback) ArgumentMatchers.any(CatchUpResponseCallback.class));
    }

    @Test
    public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable {
        this.txPuller.start();
        Mockito.when(Long.valueOf(this.txApplier.lastQueuedTxId())).thenReturn(99L);
        Mockito.when(this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest) ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback) ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn(new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_BATCH, 10L), new TxStreamFinishedResponse[]{new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_STREAM, 10L)});
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        ((CatchUpClient) Mockito.verify(this.catchUpClient, Mockito.times(2))).makeBlockingRequest((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest) ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback) ArgumentMatchers.any(CatchUpResponseCallback.class));
    }

    @Test
    public void shouldRenewTxPullTimeoutOnSuccessfulTxPulling() throws Throwable {
        this.txPuller.start();
        Mockito.when(this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest) ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback) ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn(new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_STREAM, 0L));
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertEquals(1L, this.timerService.invocationCount(CatchupPollingProcess.Timers.TX_PULLER_TIMER));
    }

    @Test
    public void nextStateShouldBeStoreCopyingIfRequestedTransactionHasBeenPrunedAway() throws Throwable {
        this.txPuller.start();
        Mockito.when(this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest) ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback) ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn(new TxStreamFinishedResponse(CatchupResult.E_TRANSACTION_PRUNED, 0L));
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertEquals(CatchupPollingProcess.State.STORE_COPYING, this.txPuller.state());
    }

    @Test
    public void nextStateShouldBeTxPullingAfterASuccessfulStoreCopy() throws Throwable {
        this.txPuller.start();
        Mockito.when(this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest) ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback) ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn(new TxStreamFinishedResponse(CatchupResult.E_TRANSACTION_PRUNED, 0L));
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        ((LocalDatabase) Mockito.verify(this.localDatabase)).stopForStoreCopy();
        ((Lifecycle) Mockito.verify(this.startStopOnStoreCopy)).stop();
        ((StoreCopyProcess) Mockito.verify(this.storeCopyProcess)).replaceWithStoreFrom((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (StoreId) ArgumentMatchers.eq(this.storeId));
        ((LocalDatabase) Mockito.verify(this.localDatabase)).start();
        ((Lifecycle) Mockito.verify(this.startStopOnStoreCopy)).start();
        ((BatchingTxApplier) Mockito.verify(this.txApplier)).refreshFromNewStore();
        Assert.assertEquals(CatchupPollingProcess.State.TX_PULLING, this.txPuller.state());
    }

    @Test
    public void shouldNotRenewTheTimeoutIfInPanicState() throws Throwable {
        this.txPuller.start();
        ((CatchUpResponseCallback) Mockito.doThrow(new Throwable[]{new RuntimeException("Panic all the things")}).when((CatchUpResponseCallback) Mockito.mock(CatchUpResponseCallback.class))).onTxPullResponse((CompletableFuture) ArgumentMatchers.any(CompletableFuture.class), (TxPullResponse) ArgumentMatchers.any(TxPullResponse.class));
        Timer timer = (Timer) Mockito.spy(Iterables.single(this.timerService.getTimers(CatchupPollingProcess.Timers.TX_PULLER_TIMER)));
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertEquals(CatchupPollingProcess.State.PANIC, this.txPuller.state());
        ((Timer) Mockito.verify(timer, Mockito.times(0))).reset();
    }

    @Test
    public void shouldNotSignalOperationalUntilPulling() throws Throwable {
        Mockito.when(this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress) ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest) ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback) ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn(new TxStreamFinishedResponse(CatchupResult.E_TRANSACTION_PRUNED, 0L), new TxStreamFinishedResponse[]{new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_BATCH, 10L), new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_STREAM, 15L)});
        this.txPuller.start();
        Future upToDateFuture = this.txPuller.upToDateFuture();
        Assert.assertFalse(upToDateFuture.isDone());
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertFalse(upToDateFuture.isDone());
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertFalse(upToDateFuture.isDone());
        this.timerService.invoke(CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertTrue(upToDateFuture.isDone());
        Assert.assertTrue(((Boolean) upToDateFuture.get()).booleanValue());
        Assert.assertEquals(CatchupPollingProcess.State.TX_PULLING, this.txPuller.state());
    }
}
