package org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.security.sasl.SaslException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.WriterAppender;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.io.FileUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.PortAssignment;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.common.Time;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.common.X509Exception;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.metrics.BaseTestMetricsProvider;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.metrics.impl.NullMetricsProvider;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.Leader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.LearnerSyncThrottler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.class */
public class QuorumPeerMainTest extends QuorumPeerTestBase {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest$BeginSnapshotListener.class */
    public interface BeginSnapshotListener {
        void start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest$Context.class */
    public static class Context {
        boolean quitFollowing = false;
        boolean exitWhenAckNewLeader = false;
        NewLeaderAckCallback newLeaderAckCallback = null;

        Context() {
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest$CustomQuorumPeer.class */
    static class CustomQuorumPeer extends QuorumPeer {
        private Context context;
        private LearnerSyncThrottler throttler = null;
        private StartForwardingListener startForwardingListener;
        private BeginSnapshotListener beginSnapshotListener;

        public CustomQuorumPeer(Context context) throws SaslException {
            this.context = context;
        }

        public void setStartForwardingListener(StartForwardingListener startForwardingListener) {
            this.startForwardingListener = startForwardingListener;
        }

        public void setBeginSnapshotListener(BeginSnapshotListener beginSnapshotListener) {
            this.beginSnapshotListener = beginSnapshotListener;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeer
        protected Follower makeFollower(FileTxnSnapLog fileTxnSnapLog) throws IOException {
            return new Follower(this, new FollowerZooKeeperServer(fileTxnSnapLog, this, getZkDb())) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.CustomQuorumPeer.1
                /* JADX INFO: Access modifiers changed from: package-private */
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.Follower
                public void followLeader() throws InterruptedException {
                    if (!CustomQuorumPeer.this.context.quitFollowing) {
                        super.followLeader();
                    } else {
                        CustomQuorumPeer.this.context.quitFollowing = false;
                        LOG.info("Quit following");
                    }
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.Learner
                public void writePacket(QuorumPacket quorumPacket, boolean z) throws IOException {
                    if (quorumPacket != null && quorumPacket.getType() == 3 && CustomQuorumPeer.this.context.exitWhenAckNewLeader && CustomQuorumPeer.this.context.newLeaderAckCallback != null) {
                        CustomQuorumPeer.this.context.newLeaderAckCallback.start();
                    }
                    super.writePacket(quorumPacket, z);
                }
            };
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeer
        protected Leader makeLeader(FileTxnSnapLog fileTxnSnapLog) throws IOException, X509Exception {
            return new Leader(this, new LeaderZooKeeperServer(fileTxnSnapLog, this, getZkDb())) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.CustomQuorumPeer.2
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.Leader, org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.LearnerMaster
                public long startForwarding(LearnerHandler learnerHandler, long j) {
                    if (CustomQuorumPeer.this.startForwardingListener != null) {
                        CustomQuorumPeer.this.startForwardingListener.start();
                    }
                    return super.startForwarding(learnerHandler, j);
                }

                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.LearnerMaster
                public LearnerSyncThrottler getLearnerSnapSyncThrottler() {
                    if (CustomQuorumPeer.this.throttler == null) {
                        CustomQuorumPeer.this.throttler = new LearnerSyncThrottler(getMaxConcurrentSnapSyncs(), LearnerSyncThrottler.SyncType.SNAP) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.CustomQuorumPeer.2.1
                            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.LearnerSyncThrottler
                            public void beginSync(boolean z) throws SyncThrottleException, InterruptedException {
                                if (CustomQuorumPeer.this.beginSnapshotListener != null) {
                                    CustomQuorumPeer.this.beginSnapshotListener.start();
                                }
                                super.beginSync(z);
                            }
                        };
                    }
                    return CustomQuorumPeer.this.throttler;
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest$CustomizedQPMain.class */
    static class CustomizedQPMain extends QuorumPeerTestBase.TestQPMain {
        private Context context;

        public CustomizedQPMain(Context context) {
            this.context = context;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMain
        protected QuorumPeer getQuorumPeer() throws SaslException {
            return new CustomQuorumPeer(this.context);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest$InjectableQuorumPeerMain.class */
    private class InjectableQuorumPeerMain extends QuorumPeerMain {
        QuorumPeer qp;

        InjectableQuorumPeerMain(QuorumPeer quorumPeer) {
            this.qp = quorumPeer;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMain
        protected QuorumPeer getQuorumPeer() {
            return this.qp;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest$NewLeaderAckCallback.class */
    interface NewLeaderAckCallback {
        void start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/QuorumPeerMainTest$StartForwardingListener.class */
    public interface StartForwardingListener {
        void start();
    }

    public void testQuorumInternal(String str) throws Exception {
        ClientBase.setupTestEnv();
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        String str2 = String.format("server.1=%1$s:%2$s:%3$s;%4$s", str, Integer.valueOf(PortAssignment.unique()), Integer.valueOf(PortAssignment.unique()), Integer.valueOf(unique)) + "\n" + String.format("server.2=%1$s:%2$s:%3$s;%4$s", str, Integer.valueOf(PortAssignment.unique()), Integer.valueOf(PortAssignment.unique()), Integer.valueOf(unique2));
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str2);
        QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str2);
        mainThread.start();
        mainThread2.start();
        Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp(str + ":" + unique, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp(str + ":" + unique2, ClientBase.CONNECTION_TIMEOUT));
        int tickTime = mainThread.main.quorumPeer.getTickTime();
        Assert.assertEquals("Default value of minimumSessionTimeOut is not considered", tickTime * 2, r0.getMinSessionTimeout());
        Assert.assertEquals("Default value of maximumSessionTimeOut is not considered", tickTime * 20, r0.getMaxSessionTimeout());
        ZooKeeper zooKeeper = new ZooKeeper(str + ":" + unique, ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zooKeeper, ZooKeeper.States.CONNECTED);
        zooKeeper.create("/foo_q1", "foobar1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals(new String(zooKeeper.getData("/foo_q1", (Watcher) null, (Stat) null)), "foobar1");
        zooKeeper.close();
        ZooKeeper zooKeeper2 = new ZooKeeper(str + ":" + unique2, ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zooKeeper2, ZooKeeper.States.CONNECTED);
        zooKeeper2.create("/foo_q2", "foobar2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals(new String(zooKeeper2.getData("/foo_q2", (Watcher) null, (Stat) null)), "foobar2");
        zooKeeper2.close();
        mainThread.shutdown();
        mainThread2.shutdown();
        Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown(str + ":" + unique, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown(str + ":" + unique2, ClientBase.CONNECTION_TIMEOUT));
    }

    @Test
    public void testQuorum() throws Exception {
        testQuorumInternal("127.0.0.1");
    }

    @Test
    public void testQuorumV6() throws Exception {
        testQuorumInternal("[::1]");
    }

    @Test
    public void testEarlyLeaderAbandonment() throws Exception {
        ClientBase.setupTestEnv();
        int[] iArr = new int[3];
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 3; i++) {
            iArr[i] = PortAssignment.unique();
            sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + iArr[i] + "\n");
        }
        String sb2 = sb.toString();
        QuorumPeerTestBase.MainThread[] mainThreadArr = new QuorumPeerTestBase.MainThread[3];
        ZooKeeper[] zooKeeperArr = new ZooKeeper[3];
        for (int i2 = 0; i2 < 3; i2++) {
            mainThreadArr[i2] = new QuorumPeerTestBase.MainThread(i2, iArr[i2], sb2);
            mainThreadArr[i2].start();
            zooKeeperArr[i2] = new ZooKeeper("127.0.0.1:" + iArr[i2], ClientBase.CONNECTION_TIMEOUT, this);
        }
        waitForAll(zooKeeperArr, ZooKeeper.States.CONNECTED);
        for (int i3 = 0; i3 < 3; i3++) {
            mainThreadArr[i3].shutdown();
        }
        waitForAll(zooKeeperArr, ZooKeeper.States.CONNECTING);
        for (int i4 = 0; i4 < 3; i4++) {
            mainThreadArr[i4].start();
            zooKeeperArr[i4] = new ZooKeeper("127.0.0.1:" + iArr[i4], ClientBase.CONNECTION_TIMEOUT, this);
        }
        waitForAll(zooKeeperArr, ZooKeeper.States.CONNECTED);
        int i5 = -1;
        ConcurrentMap<Long, Leader.Proposal> concurrentMap = null;
        for (int i6 = 0; i6 < 3; i6++) {
            if (mainThreadArr[i6].main.quorumPeer.leader == null) {
                mainThreadArr[i6].shutdown();
            } else {
                i5 = i6;
                concurrentMap = mainThreadArr[i5].main.quorumPeer.leader.outstandingProposals;
            }
        }
        try {
            zooKeeperArr[i5].create("/zk" + i5, ServiceURI.SERVICE_ZK.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail("create /zk" + i5 + " should have failed");
        } catch (KeeperException e) {
        }
        Assert.assertTrue(concurrentMap.size() == 1);
        Assert.assertTrue(concurrentMap.values().iterator().next().request.getHdr().getType() == 1);
        Thread.sleep(1000L);
        mainThreadArr[i5].shutdown();
        waitForAll(zooKeeperArr, ZooKeeper.States.CONNECTING);
        for (int i7 = 0; i7 < 3; i7++) {
            if (i7 != i5) {
                mainThreadArr[i7].start();
            }
        }
        for (int i8 = 0; i8 < 3; i8++) {
            if (i8 != i5) {
                zooKeeperArr[i8] = new ZooKeeper("127.0.0.1:" + iArr[i8], ClientBase.CONNECTION_TIMEOUT, this);
                waitForOne(zooKeeperArr[i8], ZooKeeper.States.CONNECTED);
                zooKeeperArr[i8].create("/zk" + i8, ServiceURI.SERVICE_ZK.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
        mainThreadArr[i5].start();
        waitForAll(zooKeeperArr, ZooKeeper.States.CONNECTED);
        for (int i9 = 0; i9 < 3; i9++) {
            int i10 = 0;
            while (i10 < 3) {
                if (i9 == i5) {
                    Assert.assertTrue((i10 == i5 ? "Leader (" + i5 + DefaultExpressionEngine.DEFAULT_INDEX_END : "Follower " + i10) + " should not have /zk" + i9, zooKeeperArr[i10].exists(new StringBuilder().append("/zk").append(i9).toString(), false) == null);
                } else {
                    Assert.assertTrue((i10 == i5 ? "Leader (" + i5 + DefaultExpressionEngine.DEFAULT_INDEX_END : "Follower " + i10) + " does not have /zk" + i9, zooKeeperArr[i10].exists(new StringBuilder().append("/zk").append(i9).toString(), false) != null);
                }
                i10++;
            }
        }
        for (int i11 = 0; i11 < 3; i11++) {
            zooKeeperArr[i11].close();
        }
        for (int i12 = 0; i12 < 3; i12++) {
            mainThreadArr[i12].shutdown();
        }
    }

    @Test
    public void testHighestZxidJoinLate() throws Exception {
        this.numServers = 3;
        this.servers = LaunchServers(this.numServers);
        int findLeader = this.servers.findLeader();
        Assert.assertTrue("There should be a leader", findLeader >= 0);
        int i = (findLeader + 1) % this.numServers;
        byte[] bArr = {1};
        this.servers.zk[findLeader].create("/hzxidtest" + findLeader, bArr, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.servers.zk[findLeader].create("/hzxidtest" + i, bArr, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.servers.zk[findLeader].getData("/hzxidtest" + i, false, (Stat) null);
        for (int i2 = 0; i2 < this.numServers; i2++) {
            if (i2 != findLeader) {
                this.servers.mt[i2].shutdown();
            }
        }
        bArr[0] = 2;
        this.servers.zk[findLeader].setData("/hzxidtest" + findLeader, bArr, -1, null, null);
        Thread.sleep(500L);
        this.servers.mt[findLeader].shutdown();
        System.gc();
        waitForAll(this.servers.zk, ZooKeeper.States.CONNECTING);
        for (int i3 = 0; i3 < this.numServers; i3++) {
            if (i3 != findLeader) {
                this.servers.mt[i3].start();
            }
        }
        waitForOne(this.servers.zk[i], ZooKeeper.States.CONNECTED);
        Assert.assertEquals("Expecting old value 1 since 2 isn't committed yet", this.servers.zk[i].getData("/hzxidtest" + findLeader, false, (Stat) null)[0], 1L);
        this.servers.zk[i].setData("/hzxidtest" + i, bArr, -1);
        this.servers.mt[findLeader].start();
        waitForOne(this.servers.zk[findLeader], ZooKeeper.States.CONNECTED);
        Assert.assertEquals("Validating that the deposed leader has rolled back that change it had written", this.servers.zk[findLeader].getData("/hzxidtest" + findLeader, false, (Stat) null)[0], 1L);
        Assert.assertEquals("Validating that the deposed leader caught up on changes it missed", this.servers.zk[findLeader].getData("/hzxidtest" + i, false, (Stat) null)[0], 2L);
    }

    @Test
    public void testElectionFraud() throws IOException, InterruptedException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.INFO);
        Logger logger = Logger.getLogger(QuorumPeer.class);
        logger.addAppender(consoleAppender);
        this.numServers = 3;
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        try {
            this.servers = LaunchServers(this.numServers, 500);
            int findLeader = this.servers.findLeader();
            Assert.assertTrue("There should be a leader", findLeader >= 0);
            int i = (findLeader + 1) % this.numServers;
            Assert.assertTrue("All servers should join the quorum", this.servers.mt[i].main.quorumPeer.follower != null);
            this.servers.mt[i].main.quorumPeer.electionAlg.shutdown();
            this.servers.mt[i].main.quorumPeer.follower.getSocket().close();
            waitForOne(this.servers.zk[i], ZooKeeper.States.CONNECTING);
            this.servers.mt[i].main.quorumPeer.setPeerState(QuorumPeer.ServerState.LEADING);
            Thread.sleep(2 * this.servers.mt[i].main.quorumPeer.initLimit * this.servers.mt[i].main.quorumPeer.tickTime);
            this.servers.mt[i].main.quorumPeer.startLeaderElection();
            this.servers.zk[i] = new ZooKeeper("127.0.0.1:" + this.servers.mt[i].getClientPort(), ClientBase.CONNECTION_TIMEOUT, this);
            waitForOne(this.servers.zk[i], ZooKeeper.States.CONNECTED);
            Assert.assertTrue(this.servers.mt[findLeader].main.quorumPeer.leader != null);
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            Pattern compile = Pattern.compile(".*myid=" + i + ".*LEADING.*");
            Pattern compile2 = Pattern.compile(".*myid=" + i + ".*LOOKING.*");
            Pattern compile3 = Pattern.compile(".*myid=" + i + ".*FOLLOWING.*");
            while (true) {
                String readLine = lineNumberReader.readLine();
                if (readLine == null) {
                    break;
                }
                if (!z) {
                    z = compile.matcher(readLine).matches();
                } else if (!z2) {
                    z2 = compile2.matcher(readLine).matches();
                } else if (compile3.matcher(readLine).matches()) {
                    z3 = true;
                    break;
                }
            }
            Assert.assertTrue("falseLeader never attempts to become leader", z);
            Assert.assertTrue("falseLeader never gives up on leadership", z2);
            Assert.assertTrue("falseLeader never rejoins the quorum", z3);
        } finally {
            logger.removeAppender(consoleAppender);
        }
    }

    @Test
    public void testBadPeerAddressInQuorum() throws Exception {
        ClientBase.setupTestEnv();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.WARN);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + PortAssignment.unique());
            mainThread.start();
            Assert.assertFalse("Server never came up", ClientBase.waitForServerUp("127.0.0.1:" + unique, 30000L));
            mainThread.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            boolean z = false;
            Pattern compile = Pattern.compile(".*Cannot open channel to .* at election address .*");
            do {
                String readLine = lineNumberReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    z = compile.matcher(readLine).matches();
                }
            } while (!z);
            Assert.assertTrue("complains about host", z);
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testInconsistentPeerType() throws Exception {
        ClientBase.setupTestEnv();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.INFO);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            int unique2 = PortAssignment.unique();
            int unique3 = PortAssignment.unique();
            String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique2 + "\nserver.3=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":observer;" + unique3;
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str);
            QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str);
            QuorumPeerTestBase.MainThread mainThread3 = new QuorumPeerTestBase.MainThread(3, unique3, str);
            mainThread.start();
            mainThread2.start();
            mainThread3.start();
            Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique3, ClientBase.CONNECTION_TIMEOUT));
            mainThread.shutdown();
            mainThread2.shutdown();
            mainThread3.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 3 down", ClientBase.waitForServerDown("127.0.0.1:" + unique3, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            boolean z = false;
            boolean z2 = false;
            Pattern compile = Pattern.compile(".*Peer type from servers list.* doesn't match peerType.*");
            Pattern compile2 = Pattern.compile(".*OBSERVING.*");
            while (true) {
                String readLine = lineNumberReader.readLine();
                if (readLine != null) {
                    if (compile.matcher(readLine).matches()) {
                        z = true;
                    }
                    if (compile2.matcher(readLine).matches()) {
                        z2 = true;
                    }
                    if (z && z2) {
                        break;
                    }
                } else {
                    break;
                }
            }
            Assert.assertTrue("Should warn about inconsistent peer type", z && z2);
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testBadPackets() throws Exception {
        ClientBase.setupTestEnv();
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        int unique3 = PortAssignment.unique();
        int unique4 = PortAssignment.unique();
        String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + unique3 + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + unique4 + ";" + unique2;
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str);
        QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str);
        mainThread.start();
        mainThread2.start();
        Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
        ByteBuffer wrap = ByteBuffer.wrap(new byte[4]);
        wrap.putInt(1073741824);
        wrap.position(0);
        SocketChannel open = SocketChannel.open(new InetSocketAddress("127.0.0.1", unique3));
        open.write(wrap);
        open.close();
        wrap.position(0);
        SocketChannel open2 = SocketChannel.open(new InetSocketAddress("127.0.0.1", unique4));
        open2.write(wrap);
        open2.close();
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zooKeeper, ZooKeeper.States.CONNECTED);
        zooKeeper.create("/foo_q1", "foobar1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals(new String(zooKeeper.getData("/foo_q1", (Watcher) null, (Stat) null)), "foobar1");
        zooKeeper.close();
        mainThread.shutdown();
        mainThread2.shutdown();
    }

    @Test
    public void testQuorumDefaults() throws Exception {
        ClientBase.setupTestEnv();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.INFO);
        consoleAppender.setImmediateFlush(true);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            int unique2 = PortAssignment.unique();
            String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique2;
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str);
            QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str);
            mainThread.start();
            mainThread2.start();
            Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
            mainThread.shutdown();
            mainThread2.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            byteArrayOutputStream.close();
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            boolean z = false;
            Pattern compile = Pattern.compile(".*FastLeaderElection.*");
            do {
                String readLine = lineNumberReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    z = compile.matcher(readLine).matches();
                }
            } while (!z);
            Assert.assertTrue("fastleaderelection used", z);
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testQuorumPeerExitTime() throws Exception {
        int unique = PortAssignment.unique();
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + PortAssignment.unique());
        mainThread.start();
        Thread.sleep(30000L);
        long currentElapsedTime = Time.currentElapsedTime();
        mainThread.shutdown();
        long currentElapsedTime2 = Time.currentElapsedTime();
        if (currentElapsedTime2 - currentElapsedTime > 3000) {
            Assert.fail("QuorumPeer took " + (currentElapsedTime2 - currentElapsedTime) + " to shutdown, expected 3000");
        }
    }

    @Test
    public void testMinMaxSessionTimeOut() throws Exception {
        ClientBase.setupTestEnv();
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique();
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str, "maxSessionTimeout=15000\nminSessionTimeout=10000\n");
        QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str, "maxSessionTimeout=15000\nminSessionTimeout=10000\n");
        mainThread.start();
        mainThread2.start();
        Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
        QuorumPeer quorumPeer = mainThread.main.quorumPeer;
        Assert.assertEquals("minimumSessionTimeOut is not considered", 10000L, quorumPeer.getMinSessionTimeout());
        Assert.assertEquals("maximumSessionTimeOut is not considered", 15000L, quorumPeer.getMaxSessionTimeout());
    }

    @Test
    public void testWithOnlyMinSessionTimeout() throws Exception {
        ClientBase.setupTestEnv();
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique();
        QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str, "minSessionTimeout=15000\n");
        QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str, "minSessionTimeout=15000\n");
        mainThread.start();
        mainThread2.start();
        Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
        int i = mainThread.main.quorumPeer.tickTime * 20;
        Assert.assertEquals("minimumSessionTimeOut is not considered", 15000L, r0.getMinSessionTimeout());
        Assert.assertEquals("maximumSessionTimeOut is wrong", i, r0.getMaxSessionTimeout());
    }

    @Test
    public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
        ClientBase.setupTestEnv();
        this.servers = LaunchServers(3);
        waitForAll(this.servers, ZooKeeper.States.CONNECTED);
        this.servers.shutDownAllServers();
        waitForAll(this.servers, ZooKeeper.States.CONNECTING);
        this.servers.restartAllServersAndClients(this);
        waitForAll(this.servers, ZooKeeper.States.CONNECTED);
        int findLeader = this.servers.findLeader();
        ConcurrentMap<Long, Leader.Proposal> concurrentMap = this.servers.mt[findLeader].main.quorumPeer.leader.outstandingProposals;
        int i = this.servers.mt[findLeader].main.quorumPeer.tickTime;
        this.servers.mt[findLeader].main.quorumPeer.tickTime = 10000;
        Thread.sleep(i);
        LOG.warn("LEADER {}", Integer.valueOf(findLeader));
        for (int i2 = 0; i2 < 3; i2++) {
            if (i2 != findLeader) {
                this.servers.mt[i2].shutdown();
            }
        }
        for (int i3 = 0; i3 < 3; i3++) {
            if (i3 != findLeader) {
                this.servers.mt[i3].start();
            }
        }
        for (int i4 = 0; i4 < 3; i4++) {
            if (i4 != findLeader) {
                this.servers.restartClient(i4, this);
                waitForOne(this.servers.zk[i4], ZooKeeper.States.CONNECTED);
            }
        }
        try {
            this.servers.zk[findLeader].create("/zk" + findLeader, ServiceURI.SERVICE_ZK.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail("create /zk" + findLeader + " should have failed");
        } catch (KeeperException e) {
        }
        Assert.assertTrue(concurrentMap.size() > 0);
        Leader.Proposal findProposalOfType = findProposalOfType(concurrentMap, 1);
        LOG.info("Old leader id: {}. All proposals: {}", Integer.valueOf(findLeader), concurrentMap);
        Assert.assertNotNull("Old leader doesn't have 'create' proposal", findProposalOfType);
        int i5 = 0;
        Long valueOf = Long.valueOf(findLeader);
        while (!findProposalOfType.qvAcksetPairs.get(0).getAckset().contains(valueOf)) {
            if (i5 > 2000) {
                Assert.fail("Transaction not synced to disk within 1 second " + findProposalOfType.qvAcksetPairs.get(0).getAckset() + " expected " + findLeader);
            }
            Thread.sleep(100L);
            i5 += 100;
        }
        LOG.info("Waiting for leader {} to timeout followers", Integer.valueOf(findLeader));
        int i6 = 0;
        Follower follower = this.servers.mt[findLeader].main.quorumPeer.follower;
        while (true) {
            Follower follower2 = follower;
            if (follower2 != null && follower2.isRunning()) {
                break;
            }
            if (i6 > 20000) {
                Assert.fail("Took too long for old leader to time out " + this.servers.mt[findLeader].main.quorumPeer.getPeerState());
            }
            Thread.sleep(100L);
            i6 += 100;
            follower = this.servers.mt[findLeader].main.quorumPeer.follower;
        }
        Assert.assertNotEquals(findLeader, this.servers.findLeader());
        this.servers.mt[findLeader].shutdown();
        this.servers.mt[findLeader].start();
        this.servers.restartClient(findLeader, this);
        waitForAll(this.servers, ZooKeeper.States.CONNECTED);
        for (int i7 = 0; i7 < 3; i7++) {
            Assert.assertNull("server " + i7 + " should not have /zk" + findLeader, this.servers.zk[i7].exists("/zk" + findLeader, false));
        }
    }

    @Test
    public void testLeaderOutOfView() throws Exception {
        ClientBase.setupTestEnv();
        boolean z = false;
        boolean z2 = false;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.DEBUG);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            QuorumPeerTestBase.Servers servers = new QuorumPeerTestBase.Servers();
            servers.clientPorts = new int[3];
            for (int i = 0; i < 3; i++) {
                servers.clientPorts[i] = PortAssignment.unique();
            }
            String str = getUniquePortCfgForId(1) + "\n" + getUniquePortCfgForId(2);
            String str2 = str + "\n" + getUniquePortCfgForId(3);
            servers.mt = new QuorumPeerTestBase.MainThread[3];
            servers.mt[0] = new QuorumPeerTestBase.MainThread(1, servers.clientPorts[0], str);
            for (int i2 = 1; i2 < 3; i2++) {
                servers.mt[i2] = new QuorumPeerTestBase.MainThread(i2 + 1, servers.clientPorts[i2], str2);
            }
            servers.mt[0].start();
            Assert.assertTrue(waitForQuorumPeer(servers.mt[0], ClientBase.CONNECTION_TIMEOUT).getPeerState() == QuorumPeer.ServerState.LOOKING);
            int i3 = 3 - 1;
            servers.mt[i3].start();
            Assert.assertTrue(waitForQuorumPeer(servers.mt[i3], ClientBase.CONNECTION_TIMEOUT).getPeerState() == QuorumPeer.ServerState.LOOKING);
            for (int i4 = 1; i4 < i3; i4++) {
                servers.mt[i4].start();
            }
            for (int i5 = 1; i5 < 3; i5++) {
                Assert.assertTrue("waiting for server to start", ClientBase.waitForServerUp("127.0.0.1:" + servers.clientPorts[i5], ClientBase.CONNECTION_TIMEOUT));
            }
            Assert.assertTrue(servers.mt[0].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LOOKING);
            Assert.assertTrue(servers.mt[i3].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LEADING);
            for (int i6 = 1; i6 < i3; i6++) {
                Assert.assertTrue(servers.mt[i6].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.FOLLOWING);
            }
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            Pattern compile = Pattern.compile(".*myid=1.*QuorumPeer.*LEADING.*");
            Pattern compile2 = Pattern.compile(".*myid=1.*QuorumPeer.*FOLLOWING.*");
            while (true) {
                String readLine = lineNumberReader.readLine();
                if (readLine == null || z || z2) {
                    break;
                }
                z = compile.matcher(readLine).matches();
                z2 = compile2.matcher(readLine).matches();
            }
            Assert.assertFalse("Corrupt peer should never become leader", z);
            Assert.assertFalse("Corrupt peer should not attempt connection to out of view leader", z2);
        } finally {
            logger.removeAppender(consoleAppender);
        }
    }

    @Test
    public void testDataDirAndDataLogDir() throws Exception {
        File createEmptyTestDir = ClientBase.createEmptyTestDir();
        File createEmptyTestDir2 = ClientBase.createEmptyTestDir();
        try {
            QuorumPeerConfig quorumPeerConfig = (QuorumPeerConfig) Mockito.mock(QuorumPeerConfig.class);
            Mockito.when(quorumPeerConfig.getDataDir()).thenReturn(createEmptyTestDir);
            Mockito.when(quorumPeerConfig.getDataLogDir()).thenReturn(createEmptyTestDir2);
            Mockito.when(quorumPeerConfig.getMetricsProviderClassName()).thenReturn(NullMetricsProvider.class.getName());
            QuorumPeer quorumPeer = (QuorumPeer) Mockito.mock(QuorumPeer.class);
            ((QuorumPeer) Mockito.doCallRealMethod().when(quorumPeer)).setTxnFactory((FileTxnSnapLog) ArgumentMatchers.any(FileTxnSnapLog.class));
            Mockito.when(quorumPeer.getTxnFactory()).thenCallRealMethod();
            InjectableQuorumPeerMain injectableQuorumPeerMain = new InjectableQuorumPeerMain(quorumPeer);
            injectableQuorumPeerMain.runFromConfig(quorumPeerConfig);
            FileTxnSnapLog txnFactory = injectableQuorumPeerMain.getQuorumPeer().getTxnFactory();
            Assert.assertEquals(Paths.get(createEmptyTestDir2.getAbsolutePath(), "version-2").toString(), txnFactory.getDataDir().getAbsolutePath());
            Assert.assertEquals(Paths.get(createEmptyTestDir.getAbsolutePath(), "version-2").toString(), txnFactory.getSnapDir().getAbsolutePath());
            FileUtils.deleteDirectory(createEmptyTestDir);
            FileUtils.deleteDirectory(createEmptyTestDir2);
        } catch (Throwable th) {
            FileUtils.deleteDirectory(createEmptyTestDir);
            FileUtils.deleteDirectory(createEmptyTestDir2);
            throw th;
        }
    }

    private WriterAppender getConsoleAppender(ByteArrayOutputStream byteArrayOutputStream, Level level) {
        WriterAppender writerAppender = new WriterAppender(new PatternLayout(Logger.getRootLogger().getAppender("CONSOLE").getLayout().getConversionPattern()), byteArrayOutputStream);
        writerAppender.setThreshold(level);
        return writerAppender;
    }

    private String getUniquePortCfgForId(int i) {
        return String.format("server.%d=127.0.0.1:%d:%d", Integer.valueOf(i), Integer.valueOf(PortAssignment.unique()), Integer.valueOf(PortAssignment.unique()));
    }

    private QuorumPeer waitForQuorumPeer(QuorumPeerTestBase.MainThread mainThread, int i) throws TimeoutException {
        long currentElapsedTime = Time.currentElapsedTime();
        while (true) {
            QuorumPeer quorumPeer = mainThread.isAlive() ? mainThread.getQuorumPeer() : null;
            if (quorumPeer != null) {
                return quorumPeer;
            }
            if (Time.currentElapsedTime() > currentElapsedTime + i) {
                LOG.error("Timed out while waiting for QuorumPeer");
                throw new TimeoutException();
            }
            try {
                Thread.sleep(250L);
            } catch (InterruptedException e) {
            }
        }
    }

    private Leader.Proposal findProposalOfType(Map<Long, Leader.Proposal> map, int i) {
        for (Leader.Proposal proposal : map.values()) {
            if (proposal.request.getHdr().getType() == i) {
                return proposal;
            }
        }
        return null;
    }

    @Test
    public void testInconsistentDueToNewLeaderOrder() throws Exception {
        int[] iArr = new int[3];
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 3; i++) {
            iArr[i] = PortAssignment.unique();
            sb.append(("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + iArr[i]) + "\n");
        }
        String sb2 = sb.toString();
        QuorumPeerTestBase.MainThread[] mainThreadArr = new QuorumPeerTestBase.MainThread[3];
        ZooKeeper[] zooKeeperArr = new ZooKeeper[3];
        Context[] contextArr = new Context[3];
        for (int i2 = 0; i2 < 3; i2++) {
            final Context context = new Context();
            contextArr[i2] = context;
            mainThreadArr[i2] = new QuorumPeerTestBase.MainThread(i2, iArr[i2], sb2, false) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.1
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread
                public QuorumPeerTestBase.TestQPMain getTestQPMain() {
                    return new CustomizedQPMain(context);
                }
            };
            mainThreadArr[i2].start();
            zooKeeperArr[i2] = new ZooKeeper("127.0.0.1:" + iArr[i2], ClientBase.CONNECTION_TIMEOUT, this);
        }
        waitForAll(zooKeeperArr, ZooKeeper.States.CONNECTED);
        LOG.info("all servers started");
        final String str = "/testInconsistentDueToNewLeader";
        int i3 = -1;
        int i4 = -1;
        for (int i5 = 0; i5 < 3; i5++) {
            if (mainThreadArr[i5].main.quorumPeer.leader != null) {
                i3 = i5;
            } else if (i4 == -1) {
                i4 = i5;
            }
        }
        LOG.info("shutdown follower {}", Integer.valueOf(i4));
        mainThreadArr[i4].shutdown();
        waitForOne(zooKeeperArr[i4], ZooKeeper.States.CONNECTING);
        try {
            LOG.info("force snapshot sync");
            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
            final ZooKeeper zooKeeper = zooKeeperArr[i3];
            zooKeeper.create("/testInconsistentDueToNewLeader", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            LOG.info("created node {} with value {}", "/testInconsistentDueToNewLeader", "1");
            CustomQuorumPeer customQuorumPeer = (CustomQuorumPeer) mainThreadArr[i3].main.quorumPeer;
            customQuorumPeer.setStartForwardingListener(new StartForwardingListener() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.2
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.StartForwardingListener
                public void start() {
                    if (Boolean.getBoolean(LearnerHandler.FORCE_SNAP_SYNC)) {
                        QuorumPeerTestBase.LOG.info("start forwarding, set {} to {}", str, "2");
                        try {
                            zooKeeper.setData(str, "2".getBytes(), -1, new AsyncCallback.StatCallback() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.2.1
                                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback.StatCallback
                                public void processResult(int i6, String str2, Object obj, Stat stat) {
                                }
                            }, null);
                            Thread.sleep(1000L);
                        } catch (Exception e) {
                            QuorumPeerTestBase.LOG.error("error when set {} to {}", str, "2", e);
                        }
                    }
                }
            });
            customQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.3
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.BeginSnapshotListener
                public void start() {
                    QuorumPeerTestBase.LOG.info("before sending snapshot, set {} to {}", str, "3");
                    try {
                        zooKeeper.setData(str, "3".getBytes(), -1);
                        QuorumPeerTestBase.LOG.info("successfully set {} to {}", str, "3");
                    } catch (Exception e) {
                        QuorumPeerTestBase.LOG.error("error when set {} to {}, {}", str, "3", e);
                    }
                }
            });
            LOG.info("set exit when ack new leader packet on {}", Integer.valueOf(i4));
            contextArr[i4].exitWhenAckNewLeader = true;
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final QuorumPeerTestBase.MainThread mainThread = mainThreadArr[i4];
            contextArr[i4].newLeaderAckCallback = new NewLeaderAckCallback() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.4
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.NewLeaderAckCallback
                public void start() {
                    try {
                        countDownLatch.countDown();
                        mainThread.shutdown();
                    } catch (Exception e) {
                    }
                }
            };
            LOG.info("starting follower {}", Integer.valueOf(i4));
            mainThreadArr[i4].start();
            Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
            LOG.info("disable exit when ack new leader packet on {}", Integer.valueOf(i4));
            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
            contextArr[i4].exitWhenAckNewLeader = true;
            contextArr[i4].newLeaderAckCallback = null;
            LOG.info("restarting follower {}", Integer.valueOf(i4));
            mainThreadArr[i4].start();
            zooKeeperArr[i4].close();
            zooKeeperArr[i4] = new ZooKeeper("127.0.0.1:" + iArr[i4], ClientBase.CONNECTION_TIMEOUT, this);
            waitForOne(zooKeeperArr[i4], ZooKeeper.States.CONNECTED);
            Assert.assertEquals(new String(zooKeeperArr[i4].getData("/testInconsistentDueToNewLeader", (Watcher) null, (Stat) null)), new String(zooKeeperArr[i3].getData("/testInconsistentDueToNewLeader", (Watcher) null, (Stat) null)));
            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
            for (int i6 = 0; i6 < 3; i6++) {
                mainThreadArr[i6].shutdown();
                zooKeeperArr[i6].close();
            }
        } catch (Throwable th) {
            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
            for (int i7 = 0; i7 < 3; i7++) {
                mainThreadArr[i7].shutdown();
                zooKeeperArr[i7].close();
            }
            throw th;
        }
    }

    @Test
    public void testLeaderElectionWithDisloyalVoter() throws IOException {
        testLeaderElection(5, 3, 1000, 10000);
    }

    @Test
    public void testLeaderElectionWithDisloyalVoter_stillHasMajority() throws IOException {
        testLeaderElection(5, 5, 3000, 20000);
    }

    void testLeaderElection(int i, int i2, int i3, int i4) throws IOException {
        Leader.setMaxTimeToWaitForEpoch(i3);
        this.servers = new QuorumPeerTestBase.Servers();
        int[] iArr = new int[i];
        StringBuilder sb = new StringBuilder();
        for (int i5 = 0; i5 < i; i5++) {
            iArr[i5] = PortAssignment.unique();
            sb.append(("server." + i5 + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + iArr[i5]) + "\n");
        }
        String sb2 = sb.toString();
        QuorumPeerTestBase.MainThread[] mainThreadArr = new QuorumPeerTestBase.MainThread[i2];
        Context[] contextArr = new Context[i2];
        this.servers.mt = mainThreadArr;
        this.numServers = i2;
        for (int i6 = 0; i6 < i2; i6++) {
            final Context context = new Context();
            if (i6 == 0) {
                context.quitFollowing = true;
            }
            contextArr[i6] = context;
            mainThreadArr[i6] = new QuorumPeerTestBase.MainThread(i6, iArr[i6], sb2, false) { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerMainTest.5
                @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread
                public QuorumPeerTestBase.TestQPMain getTestQPMain() {
                    return new CustomizedQPMain(context);
                }
            };
            mainThreadArr[i6].start();
        }
        for (int i7 = 0; i7 < i2; i7++) {
            Assert.assertTrue("Server " + i7 + " should have joined quorum by now", ClientBase.waitForServerUp("127.0.0.1:" + iArr[i7], i4));
        }
    }

    @Test
    public void testMetricsProviderLifecycle() throws Exception {
        ClientBase.setupTestEnv();
        BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.reset();
        WriterAppender consoleAppender = getConsoleAppender(new ByteArrayOutputStream(), Level.WARN);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            int unique2 = PortAssignment.unique();
            String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique2 + "\n";
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.class.getName() + "\n");
            QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str);
            mainThread.start();
            mainThread2.start();
            boolean waitForServerUp = ClientBase.waitForServerUp("127.0.0.1:" + unique, 30000L);
            boolean waitForServerUp2 = ClientBase.waitForServerUp("127.0.0.1:" + unique2, 30000L);
            Assert.assertTrue("Server 1 never came up", waitForServerUp);
            Assert.assertTrue("Server 2 never came up", waitForServerUp2);
            mainThread.shutdown();
            mainThread2.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            Assert.assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.configureCalled.get());
            Assert.assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.startCalled.get());
            Assert.assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.getRootContextCalled.get());
            Assert.assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.stopCalled.get());
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testMetricsProviderConfiguration() throws Exception {
        ClientBase.setupTestEnv();
        BaseTestMetricsProvider.MetricsProviderWithConfiguration.httpPort.set(0);
        WriterAppender consoleAppender = getConsoleAppender(new ByteArrayOutputStream(), Level.WARN);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            int unique2 = PortAssignment.unique();
            String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique2 + "\n";
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithConfiguration.class.getName() + "\nmetricsProvider.httpPort=1234");
            QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str);
            mainThread.start();
            mainThread2.start();
            boolean waitForServerUp = ClientBase.waitForServerUp("127.0.0.1:" + unique, 30000L);
            boolean waitForServerUp2 = ClientBase.waitForServerUp("127.0.0.1:" + unique2, 30000L);
            Assert.assertTrue("Server 1 never came up", waitForServerUp);
            Assert.assertTrue("Server 2 never came up", waitForServerUp2);
            mainThread.shutdown();
            mainThread2.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            Assert.assertEquals(1234L, BaseTestMetricsProvider.MetricsProviderWithConfiguration.httpPort.get());
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testFaultyMetricsProviderOnStop() throws Exception {
        ClientBase.setupTestEnv();
        BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.reset();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.WARN);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            int unique2 = PortAssignment.unique();
            String str = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique2 + "\n";
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInStop.class.getName() + "\n");
            QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str);
            mainThread.start();
            mainThread2.start();
            boolean waitForServerUp = ClientBase.waitForServerUp("127.0.0.1:" + unique, 30000L);
            boolean waitForServerUp2 = ClientBase.waitForServerUp("127.0.0.1:" + unique2, 30000L);
            Assert.assertTrue("Server 1 never came up", waitForServerUp);
            Assert.assertTrue("Server 2 never came up", waitForServerUp2);
            mainThread.shutdown();
            mainThread2.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            Assert.assertTrue("waiting for server 2 down", ClientBase.waitForServerDown("127.0.0.1:" + unique2, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            Assert.assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderWithErrorInStop.stopCalled.get());
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            boolean z = false;
            Pattern compile = Pattern.compile(".*Error while stopping metrics.*");
            do {
                String readLine = lineNumberReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    z = compile.matcher(readLine).matches();
                }
            } while (!z);
            Assert.assertTrue("complains about metrics provider", z);
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testInvalidMetricsProvider() throws Exception {
        ClientBase.setupTestEnv();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.WARN);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nmetricsProvider.className=BadClass\n");
            mainThread.start();
            Assert.assertFalse("Server never came up", ClientBase.waitForServerUp("127.0.0.1:" + unique, 5000L));
            mainThread.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            boolean z = false;
            Pattern compile = Pattern.compile(".*BadClass.*");
            do {
                String readLine = lineNumberReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    z = compile.matcher(readLine).matches();
                }
            } while (!z);
            Assert.assertTrue("complains about metrics provider", z);
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testFaultyMetricsProviderOnStart() throws Exception {
        ClientBase.setupTestEnv();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.WARN);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nmetricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInStart.class.getName() + "\n");
            mainThread.start();
            Assert.assertFalse("Server never came up", ClientBase.waitForServerUp("127.0.0.1:" + unique, 5000L));
            mainThread.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            boolean z = false;
            Pattern compile = Pattern.compile(".*MetricsProviderLifeCycleException.*");
            do {
                String readLine = lineNumberReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    z = compile.matcher(readLine).matches();
                }
            } while (!z);
            Assert.assertTrue("complains about metrics provider MetricsProviderLifeCycleException", z);
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }

    @Test
    public void testFaultyMetricsProviderOnConfigure() throws Exception {
        ClientBase.setupTestEnv();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        WriterAppender consoleAppender = getConsoleAppender(byteArrayOutputStream, Level.WARN);
        Logger logger = Logger.getLogger("org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum");
        logger.addAppender(consoleAppender);
        try {
            int unique = PortAssignment.unique();
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + unique + "\nmetricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInConfigure.class.getName() + "\n");
            mainThread.start();
            Assert.assertFalse("Server never came up", ClientBase.waitForServerUp("127.0.0.1:" + unique, 5000L));
            mainThread.shutdown();
            Assert.assertTrue("waiting for server 1 down", ClientBase.waitForServerDown("127.0.0.1:" + unique, ClientBase.CONNECTION_TIMEOUT));
            logger.removeAppender(consoleAppender);
            LineNumberReader lineNumberReader = new LineNumberReader(new StringReader(byteArrayOutputStream.toString()));
            boolean z = false;
            Pattern compile = Pattern.compile(".*MetricsProviderLifeCycleException.*");
            do {
                String readLine = lineNumberReader.readLine();
                if (readLine == null) {
                    break;
                } else {
                    z = compile.matcher(readLine).matches();
                }
            } while (!z);
            Assert.assertTrue("complains about metrics provider MetricsProviderLifeCycleException", z);
        } catch (Throwable th) {
            logger.removeAppender(consoleAppender);
            throw th;
        }
    }
}
