/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.scenarios;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.ReadReplica;
import org.neo4j.driver.internal.logging.JULogging;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.AuthToken;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
import org.neo4j.driver.v1.summary.ServerInfo;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.io.fs.FileUtils;
import org.neo4j.test.causalclustering.ClusterRule;
import org.neo4j.test.rule.SuppressOutput;

public class BoltCausalClusteringIT {
    private static final long DEFAULT_TIMEOUT_MS = 15000L;
    @Rule
    public final ClusterRule clusterRule = new ClusterRule(this.getClass()).withNumberOfCoreMembers(3);
    @Rule
    public final SuppressOutput suppressOutput = SuppressOutput.suppressAll();
    private Cluster cluster;

    @Before
    public void setup() throws Exception {
        File knownHosts = new File(System.getProperty("user.home") + "/.neo4j/known_hosts");
        FileUtils.deleteFile((File)knownHosts);
    }

    @Test
    public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(0).startCluster();
        this.cluster.coreTx((db, tx) -> {
            Iterators.count((Iterator)db.execute("CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE"));
            tx.success();
        });
        int count = this.executeWriteAndReadThroughBolt(this.cluster.awaitLeader());
        Assert.assertEquals((long)1L, (long)count);
    }

    @Test
    public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfFollower() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(0).startCluster();
        this.cluster.coreTx((db, tx) -> {
            Iterators.count((Iterator)db.execute("CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE"));
            tx.success();
        });
        int count = this.executeWriteAndReadThroughBolt(this.cluster.getDbWithRole(Role.FOLLOWER));
        Assert.assertEquals((long)1L, (long)count);
    }

    private int executeWriteAndReadThroughBolt(CoreClusterMember core) throws TimeoutException, InterruptedException {
        try (Driver driver = GraphDatabase.driver((String)core.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));){
            int n = this.inExpirableSession(driver, d -> d.session(AccessMode.WRITE), session -> {
                session.run("MERGE (n:Person {name: 'Jim'})").consume();
                Record record = session.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
                return record.get("count").asInt();
            });
            return n;
        }
    }

    @Test
    public void shouldNotBeAbleToWriteOnAReadSession() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(0).startCluster();
        org.neo4j.test.assertion.Assert.assertEventually((String)"Failed to execute write query on read server", () -> {
            this.switchLeader(this.cluster.awaitLeader());
            CoreClusterMember leader = this.cluster.awaitLeader();
            try (Driver driver = GraphDatabase.driver((String)leader.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));){
                Boolean bl;
                Serializable serializable;
                Session session;
                block17: {
                    block18: {
                        session = driver.session(AccessMode.READ);
                        serializable = null;
                        session.run("CREATE (n:Person {name: 'Jim'})").consume();
                        bl = false;
                        if (session == null) break block17;
                        if (serializable == null) break block18;
                        try {
                            session.close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)serializable).addSuppressed(throwable);
                        }
                        break block17;
                    }
                    session.close();
                }
                return bl;
                catch (Throwable throwable) {
                    try {
                        try {
                            serializable = throwable;
                            throw throwable;
                        }
                        catch (Throwable throwable2) {
                            if (session != null) {
                                if (serializable != null) {
                                    try {
                                        session.close();
                                    }
                                    catch (Throwable throwable3) {
                                        ((Throwable)serializable).addSuppressed(throwable3);
                                    }
                                } else {
                                    session.close();
                                }
                            }
                            throw throwable2;
                        }
                    }
                    catch (ClientException ex) {
                        Assert.assertEquals((Object)"Write queries cannot be performed in READ access mode.", (Object)ex.getMessage());
                        serializable = Boolean.valueOf(true);
                        return serializable;
                    }
                }
            }
        }, (Matcher)Is.is((Object)true), (long)30L, (TimeUnit)TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void sessionShouldExpireOnLeaderSwitch() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(0).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        try (Driver driver = GraphDatabase.driver((String)leader.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));
             Session session = driver.session();){
            session.run("CREATE (n:Person {name: 'Jim'})").consume();
            this.switchLeader(leader);
            session.run("CREATE (n:Person {name: 'Mark'})").consume();
            Assert.fail((String)"Should have thrown exception");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldBeAbleToGetClusterOverview() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(0).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        try (Driver driver = GraphDatabase.driver((String)leader.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));
             Session session = driver.session();){
            StatementResult overview = session.run("CALL dbms.cluster.overview");
            Assert.assertThat((Object)overview.list(), (Matcher)Matchers.hasSize((int)3));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldPickANewServerToWriteToOnLeaderSwitch() throws Throwable {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(0).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        CountDownLatch leaderSwitchLatch = new CountDownLatch(1);
        LeaderSwitcher leaderSwitcher = new LeaderSwitcher(this.cluster, leaderSwitchLatch);
        Config config = Config.build().withLogging((Logging)new JULogging(Level.OFF)).toConfig();
        HashSet<String> seenAddresses = new HashSet<String>();
        try (Driver driver = GraphDatabase.driver((String)leader.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"), (Config)config);){
            boolean success = false;
            long deadline = System.currentTimeMillis() + 30000L;
            while (!success) {
                if (System.currentTimeMillis() > deadline) {
                    Assert.fail((String)("Failed to write to the new leader in time. Addresses seen: " + seenAddresses));
                }
                try (Session session = driver.session(AccessMode.WRITE);){
                    StatementResult result = session.run("CREATE (p:Person)");
                    ServerInfo server = result.summary().server();
                    seenAddresses.add(server.address());
                    success = seenAddresses.size() >= 2;
                }
                catch (Exception e) {
                    Thread.sleep(100L);
                }
                if (seenAddresses.isEmpty() || success) continue;
                leaderSwitcher.start();
                leaderSwitchLatch.await();
            }
        }
        finally {
            leaderSwitcher.stop();
            Assert.assertTrue((boolean)leaderSwitcher.hadLeaderSwitch());
            Assert.assertThat((Object)seenAddresses.size(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(2)));
        }
    }

    @Test
    public void sessionCreationShouldFailIfCallingDiscoveryProcedureOnEdgeServer() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(1).startCluster();
        ReadReplica readReplica = this.cluster.getReadReplicaById(0);
        try {
            GraphDatabase.driver((String)readReplica.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));
            Assert.fail((String)"Should have thrown an exception using a read replica address for routing");
        }
        catch (ServiceUnavailableException ex) {
            Assert.assertThat((Object)ex.getMessage(), (Matcher)Matchers.startsWith((String)"Failed to run"));
        }
    }

    @Test
    public void shouldReadAndWriteToANewSessionCreatedAfterALeaderSwitch() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(1).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        try (Driver driver = GraphDatabase.driver((String)leader.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));){
            this.inExpirableSession(driver, Driver::session, session -> {
                session.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Jim"}));
                Record record = session.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
                Assert.assertEquals((long)1L, (long)record.get("count").asInt());
                try {
                    this.switchLeader(leader);
                    session.run("CREATE (p:Person {name: {name} })").consume();
                    Assert.fail((String)"Should have thrown an exception as the leader went away mid session");
                }
                catch (SessionExpiredException sep) {
                    Assert.assertEquals((Object)String.format("Server at %s no longer accepts writes", leader.boltAdvertisedAddress()), (Object)sep.getMessage());
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return null;
            });
            this.inExpirableSession(driver, Driver::session, session -> {
                session.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Jim"}));
                Record record = session.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
                Assert.assertEquals((long)2L, (long)record.get("count").asInt());
                return null;
            });
        }
    }

    @Test
    public void bookmarksShouldWorkWithDriverPinnedToSingleServer() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(1).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        try (Driver driver = GraphDatabase.driver((String)leader.directURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));){
            String bookmark = this.inExpirableSession(driver, Driver::session, session -> {
                try (Transaction tx = session.beginTransaction();){
                    tx.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Alistair"}));
                    tx.success();
                }
                return session.lastBookmark();
            });
            Assert.assertNotNull((Object)bookmark);
            try (Session session2 = driver.session();
                 Transaction tx = session2.beginTransaction(bookmark);){
                Record record = tx.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
                Assert.assertEquals((long)1L, (long)record.get("count").asInt());
                tx.success();
            }
        }
    }

    @Test
    public void shouldUseBookmarkFromAReadSessionInAWriteSession() throws Exception {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(1).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        try (Driver driver = GraphDatabase.driver((String)leader.directURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));){
            String bookmark;
            this.inExpirableSession(driver, d -> d.session(AccessMode.WRITE), session -> {
                session.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Jim"}));
                return null;
            });
            try (Session session2 = driver.session(AccessMode.READ);){
                try (Transaction tx = session2.beginTransaction();){
                    tx.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
                    tx.success();
                }
                bookmark = session2.lastBookmark();
            }
            Assert.assertNotNull((Object)bookmark);
            this.inExpirableSession(driver, d -> d.session(AccessMode.WRITE), session -> {
                try (Transaction tx = session.beginTransaction(bookmark);){
                    tx.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Alistair"}));
                    tx.success();
                }
                return null;
            });
            session2 = driver.session();
            var6_5 = null;
            try {
                Record record = session2.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
                Assert.assertEquals((long)2L, (long)record.get("count").asInt());
            }
            catch (Throwable throwable) {
                var6_5 = throwable;
                throw throwable;
            }
            finally {
                if (session2 != null) {
                    if (var6_5 != null) {
                        try {
                            session2.close();
                        }
                        catch (Throwable throwable) {
                            var6_5.addSuppressed(throwable);
                        }
                    } else {
                        session2.close();
                    }
                }
            }
        }
    }

    @Test
    public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Throwable {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(1).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        ReadReplica readReplica = this.cluster.getReadReplicaById(0);
        readReplica.txPollingClient().stop();
        Driver driver = GraphDatabase.driver((String)leader.directURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));
        String bookmark = this.inExpirableSession(driver, d -> d.session(AccessMode.WRITE), session -> {
            try (Transaction tx = session.beginTransaction();){
                tx.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Jim"}));
                tx.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Alistair"}));
                tx.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Mark"}));
                tx.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Chris"}));
                tx.success();
            }
            return session.lastBookmark();
        });
        Assert.assertNotNull((Object)bookmark);
        readReplica.txPollingClient().start();
        driver = GraphDatabase.driver((String)readReplica.directURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));
        try (Session session2 = driver.session(AccessMode.READ);
             Transaction tx = session2.beginTransaction(bookmark);){
            Record record = tx.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
            tx.success();
            Assert.assertEquals((long)4L, (long)record.get("count").asInt());
        }
    }

    @Test
    public void shouldSendRequestsToNewlyAddedReadReplicas() throws Throwable {
        this.cluster = this.clusterRule.withNumberOfReadReplicas(1).withSharedCoreParams(MapUtil.stringMap((String[])new String[]{CausalClusteringSettings.cluster_routing_ttl.name(), "1s"})).startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        Driver driver = GraphDatabase.driver((String)leader.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));
        String bookmark = this.inExpirableSession(driver, d -> d.session(AccessMode.WRITE), session -> {
            try (Transaction tx = session.beginTransaction();){
                tx.run("CREATE (p:Person {name: {name} })", Values.parameters((Object[])new Object[]{"name", "Jim"}));
                tx.success();
            }
            return session.lastBookmark();
        });
        HashSet<String> readReplicas = new HashSet<String>();
        for (ReadReplica readReplica : this.cluster.readReplicas()) {
            readReplicas.add(readReplica.boltAdvertisedAddress());
        }
        for (int i = 10; i <= 13; ++i) {
            ReadReplica newReadReplica = this.cluster.addReadReplicaWithId(i);
            readReplicas.add(newReadReplica.boltAdvertisedAddress());
            newReadReplica.start();
        }
        org.neo4j.test.assertion.Assert.assertEventually((String)"Failed to send requests to all servers", () -> {
            for (int i = 0; i < this.cluster.readReplicas().size(); ++i) {
                try (Session session = driver.session(AccessMode.READ, bookmark);){
                    this.executeReadQuery(bookmark, session);
                    session.readTransaction(tx -> {
                        StatementResult result = tx.run("MATCH (n:Person) RETURN COUNT(*) AS count");
                        Assert.assertEquals((long)1L, (long)result.next().get("count").asInt());
                        readReplicas.remove(result.summary().server().address());
                        return null;
                    });
                    continue;
                }
                catch (Throwable throwable) {
                    return false;
                }
            }
            return readReplicas.size() == 0;
        }, (Matcher)Is.is((Object)true), (long)30L, (TimeUnit)TimeUnit.SECONDS);
    }

    @Test
    public void shouldHandleLeaderSwitch() throws Exception {
        this.cluster = this.clusterRule.startCluster();
        CoreClusterMember leader = this.cluster.awaitLeader();
        try (Driver driver = GraphDatabase.driver((String)leader.routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));){
            try (Session session = driver.session();){
                try (Transaction tx = session.beginTransaction();){
                    this.switchLeader(leader);
                    tx.run("CREATE (person:Person {name: {name}, title: {title}})", Values.parameters((Object[])new Object[]{"name", "Webber", "title", "Mr"}));
                    tx.success();
                }
                catch (SessionExpiredException sessionExpiredException) {
                    // empty catch block
                }
            }
            String bookmark = this.inExpirableSession(driver, Driver::session, s -> {
                try (Transaction tx = s.beginTransaction();){
                    tx.run("CREATE (person:Person {name: {name}, title: {title}})", Values.parameters((Object[])new Object[]{"name", "Webber", "title", "Mr"}));
                    tx.success();
                }
                catch (SessionExpiredException sessionExpiredException) {
                    // empty catch block
                }
                return s.lastBookmark();
            });
            try (Session session = driver.session(AccessMode.READ, bookmark);
                 Transaction tx = session.beginTransaction();){
                Record record = tx.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
                tx.success();
                Assert.assertEquals((long)1L, (long)record.get("count").asInt());
            }
        }
    }

    @Test
    public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() throws Throwable {
        Map params = MapUtil.stringMap((String[])new String[]{GraphDatabaseSettings.keep_logical_logs.name(), "keep_none", GraphDatabaseSettings.logical_log_rotation_threshold.name(), "1M", GraphDatabaseSettings.check_point_interval_time.name(), "100ms", CausalClusteringSettings.cluster_allow_reads_on_followers.name(), "false"});
        Cluster cluster = this.clusterRule.withSharedCoreParams(params).withNumberOfReadReplicas(1).startCluster();
        Driver driver = GraphDatabase.driver((String)cluster.awaitLeader().routingURI(), (AuthToken)AuthTokens.basic((String)"neo4j", (String)"neo4j"));
        try (Session session = driver.session();){
            session.writeTransaction(tx -> {
                tx.run("MERGE (n:Person {name: 'Jim'})");
                return null;
            });
        }
        ReadReplica replica = cluster.findAnyReadReplica();
        CatchupPollingProcess pollingClient = (CatchupPollingProcess)replica.database().getDependencyResolver().resolveDependency(CatchupPollingProcess.class);
        pollingClient.stop();
        String lastBookmark = null;
        int iterations = 5;
        int nodesToCreate = 20000;
        for (int i = 0; i < iterations; ++i) {
            try (Session writeSession = driver.session();){
                writeSession.writeTransaction(tx -> {
                    tx.run("UNWIND range(1, {nodesToCreate}) AS i CREATE (n:Person {name: 'Jim'})", Values.parameters((Object[])new Object[]{"nodesToCreate", 20000}));
                    return null;
                });
                lastBookmark = writeSession.lastBookmark();
                continue;
            }
        }
        pollingClient.start();
        pollingClient.upToDateFuture().get();
        int happyCount = 0;
        int numberOfRequests = 1000;
        for (int i = 0; i < numberOfRequests; ++i) {
            try (Session session = driver.session(lastBookmark);){
                happyCount += ((Integer)session.readTransaction(tx -> {
                    tx.run("MATCH (n:Person) RETURN COUNT(*) AS count");
                    return 1;
                })).intValue();
                continue;
            }
        }
        Assert.assertEquals((long)numberOfRequests, (long)happyCount);
    }

    private void executeReadQuery(String bookmark, Session session) {
        try (Transaction tx = session.beginTransaction(bookmark);){
            Record record = tx.run("MATCH (n:Person) RETURN COUNT(*) AS count").next();
            Assert.assertEquals((long)1L, (long)record.get("count").asInt());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private <T> T inExpirableSession(Driver driver, Function<Driver, Session> acquirer, Function<Session, T> op) throws TimeoutException, InterruptedException {
        long endTime = System.currentTimeMillis() + 15000L;
        while (true) {
            try (Session session = acquirer.apply(driver);){
                Session t = op.apply(session);
                return (T)t;
            }
            catch (SessionExpiredException sessionExpiredException) {
                if (System.currentTimeMillis() < endTime) continue;
                throw new TimeoutException("Transaction did not succeed in time");
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void switchLeader(CoreClusterMember initialLeader) throws InterruptedException {
        long deadline = System.currentTimeMillis() + 30000L;
        Role role = initialLeader.database().getRole();
        while (role != Role.FOLLOWER) {
            if (System.currentTimeMillis() > deadline) {
                throw new RuntimeException("Failed to switch leader in time");
            }
            try {
                this.triggerElection(initialLeader);
            }
            catch (IOException | TimeoutException exception) {}
            continue;
            finally {
                role = initialLeader.database().getRole();
                Thread.sleep(100L);
            }
        }
    }

    private CoreClusterMember triggerElection(CoreClusterMember initialLeader) throws IOException, TimeoutException {
        for (CoreClusterMember coreClusterMember : this.cluster.coreMembers()) {
            if (coreClusterMember.equals(initialLeader)) continue;
            coreClusterMember.raft().triggerElection();
            return this.cluster.awaitLeader();
        }
        return initialLeader;
    }

    private class LeaderSwitcher
    implements Runnable {
        private final Cluster cluster;
        private final CountDownLatch switchCompleteLatch;
        private CoreClusterMember initialLeader;
        private CoreClusterMember currentLeader;
        private Thread thread;
        private boolean stopped;
        private Throwable throwable;

        LeaderSwitcher(Cluster cluster, CountDownLatch switchCompleteLatch) {
            this.cluster = cluster;
            this.switchCompleteLatch = switchCompleteLatch;
        }

        @Override
        public void run() {
            try {
                this.initialLeader = this.cluster.awaitLeader();
                while (!this.stopped) {
                    this.currentLeader = this.cluster.awaitLeader();
                    if (this.currentLeader == this.initialLeader) {
                        BoltCausalClusteringIT.this.switchLeader(this.initialLeader);
                        this.currentLeader = this.cluster.awaitLeader();
                    } else {
                        this.switchCompleteLatch.countDown();
                    }
                    Thread.sleep(100L);
                }
            }
            catch (Throwable e) {
                this.throwable = e;
            }
        }

        void start() {
            if (this.thread == null) {
                this.thread = new Thread(this);
                this.thread.start();
            }
        }

        void stop() throws Throwable {
            if (this.thread != null) {
                this.stopped = true;
                this.thread.join();
            }
            this.assertNoException();
        }

        boolean hadLeaderSwitch() {
            return this.currentLeader != this.initialLeader;
        }

        void assertNoException() throws Throwable {
            if (this.throwable != null) {
                throw this.throwable;
            }
        }
    }
}

