/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.scenarios;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.ReadReplica;
import org.neo4j.causalclustering.helpers.DataCreator;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase;
import org.neo4j.causalclustering.scenarios.EnterpriseDiscoveryServiceType;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.assertion.Assert;
import org.neo4j.test.causalclustering.ClusterRule;

public class ReadReplicaToReadReplicaCatchupIT {
    @Rule
    public final ClusterRule clusterRule = new ClusterRule().withNumberOfCoreMembers(3).withNumberOfReadReplicas(0).withSharedCoreParam(CausalClusteringSettings.cluster_topology_refresh, "5s").withSharedCoreParam(CausalClusteringSettings.multi_dc_license, "true").withSharedReadReplicaParam(CausalClusteringSettings.multi_dc_license, "true").withDiscoveryServiceType(EnterpriseDiscoveryServiceType.HAZELCAST);

    @Test
    public void shouldEventuallyPullTransactionAcrossReadReplicas() throws Throwable {
        Cluster<?> cluster = this.clusterRule.startCluster();
        int numberOfNodesToCreate = 100;
        cluster.coreTx((db, tx) -> {
            db.schema().constraintFor(Label.label((String)"Foo")).assertPropertyIsUnique("foobar").create();
            tx.success();
        });
        DataCreator.createLabelledNodesWithProperty(cluster, numberOfNodesToCreate, Label.label((String)"Foo"), () -> Pair.of((Object)"foobar", (Object)String.format("baz_bat%s", UUID.randomUUID())));
        ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors(101, new Monitors());
        firstReadReplica.start();
        ReadReplicaToReadReplicaCatchupIT.checkDataHasReplicatedToReadReplicas(cluster, numberOfNodesToCreate);
        for (CoreClusterMember coreClusterMember : cluster.coreMembers()) {
            coreClusterMember.disableCatchupServer();
        }
        SpecificReplicaStrategy.upstreamFactory.setCurrent(firstReadReplica);
        ReadReplica secondReadReplica = cluster.addReadReplicaWithId(202);
        secondReadReplica.setUpstreamDatabaseSelectionStrategy("specific");
        secondReadReplica.start();
        ReadReplicaToReadReplicaCatchupIT.checkDataHasReplicatedToReadReplicas(cluster, numberOfNodesToCreate);
    }

    @Test
    public void shouldCatchUpFromCoresWhenPreferredReadReplicasAreUnavailable() throws Throwable {
        Cluster<?> cluster = this.clusterRule.startCluster();
        int numberOfNodes = 1;
        int firstReadReplicaLocalMemberId = 101;
        cluster.coreTx((db, tx) -> {
            db.schema().constraintFor(Label.label((String)"Foo")).assertPropertyIsUnique("foobar").create();
            tx.success();
        });
        DataCreator.createLabelledNodesWithProperty(cluster, numberOfNodes, Label.label((String)"Foo"), () -> Pair.of((Object)"foobar", (Object)String.format("baz_bat%s", UUID.randomUUID())));
        ReadReplica firstReadReplica = cluster.addReadReplicaWithIdAndMonitors(firstReadReplicaLocalMemberId, new Monitors());
        firstReadReplica.start();
        ReadReplicaToReadReplicaCatchupIT.checkDataHasReplicatedToReadReplicas(cluster, numberOfNodes);
        SpecificReplicaStrategy.upstreamFactory.setCurrent(firstReadReplica);
        ReadReplica secondReadReplica = cluster.addReadReplicaWithId(202);
        secondReadReplica.setUpstreamDatabaseSelectionStrategy("specific");
        secondReadReplica.start();
        ReadReplicaToReadReplicaCatchupIT.checkDataHasReplicatedToReadReplicas(cluster, numberOfNodes);
        firstReadReplica.shutdown();
        SpecificReplicaStrategy.upstreamFactory.reset();
        cluster.removeReadReplicaWithMemberId(firstReadReplicaLocalMemberId);
        DataCreator.createLabelledNodesWithProperty(cluster, numberOfNodes, Label.label((String)"Foo"), () -> Pair.of((Object)"foobar", (Object)String.format("baz_bat%s", UUID.randomUUID())));
        ReadReplicaToReadReplicaCatchupIT.checkDataHasReplicatedToReadReplicas(cluster, numberOfNodes * 2);
    }

    static void checkDataHasReplicatedToReadReplicas(Cluster<?> cluster, long numberOfNodes) throws Exception {
        for (ReadReplica server : cluster.readReplicas()) {
            ReadReplicaGraphDatabase readReplica = server.database();
            Transaction tx = readReplica.beginTx();
            Throwable throwable = null;
            try {
                ThrowingSupplier nodeCount = () -> ReadReplicaToReadReplicaCatchupIT.lambda$checkDataHasReplicatedToReadReplicas$5((GraphDatabaseService)readReplica);
                Assert.assertEventually((String)"node to appear on read replica", (ThrowingSupplier)nodeCount, (Matcher)Is.is((Object)numberOfNodes), (long)1L, (TimeUnit)TimeUnit.MINUTES);
                for (Node node : readReplica.getAllNodes()) {
                    MatcherAssert.assertThat((Object)node.getProperty("foobar").toString(), (Matcher)CoreMatchers.startsWith((String)"baz_bat"));
                }
                tx.success();
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (tx == null) continue;
                if (throwable != null) {
                    try {
                        tx.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                tx.close();
            }
        }
    }

    private static /* synthetic */ Long lambda$checkDataHasReplicatedToReadReplicas$5(GraphDatabaseService readReplica) throws Exception {
        return Iterables.count((Iterable)readReplica.getAllNodes());
    }

    private static class UpstreamFactory {
        private ReadReplica current;

        private UpstreamFactory() {
        }

        public void setCurrent(ReadReplica readReplica) {
            this.current = readReplica;
        }

        public ReadReplica current() {
            return this.current;
        }

        void reset() {
            this.current = null;
        }
    }

    public static class SpecificReplicaStrategy
    extends UpstreamDatabaseSelectionStrategy {
        static final UpstreamFactory upstreamFactory = new UpstreamFactory();

        public SpecificReplicaStrategy() {
            super("specific", new String[0]);
        }

        public Optional<MemberId> upstreamDatabase() {
            ReadReplica current = upstreamFactory.current();
            if (current == null) {
                return Optional.empty();
            }
            return Optional.of(current.memberId());
        }
    }
}

