package org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import org.apache.pulsar.functions.runtime.shaded.org.apache.jute.InputArchive;
import org.apache.pulsar.functions.runtime.shaded.org.apache.jute.OutputArchive;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.MockPacket;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZKParameterized;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.proto.ConnectRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.proto.ReplyHeader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.proto.RequestHeader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.proto.SetWatches;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.MockNIOServerCnxn;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.MockSelectorThread;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.ZKDatabase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Before;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.runner.RunWith;
import org.apache.pulsar.functions.runtime.shaded.org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameterized.UseParametersRunnerFactory(ZKParameterized.RunnerFactory.class)
@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/WatchLeakTest.class */
public class WatchLeakTest {
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) WatchLeakTest.class);
    final long SESSION_ID = 47806;
    private final boolean sessionTimedout;
    private static final long superSecret = 3007405056L;

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/WatchLeakTest$FakeSK.class */
    private static class FakeSK extends SelectionKey {
        private int ops;

        private FakeSK() {
            this.ops = 5;
        }

        @Override // java.nio.channels.SelectionKey
        public SelectableChannel channel() {
            return null;
        }

        @Override // java.nio.channels.SelectionKey
        public Selector selector() {
            return (Selector) Mockito.mock(Selector.class);
        }

        @Override // java.nio.channels.SelectionKey
        public boolean isValid() {
            return true;
        }

        @Override // java.nio.channels.SelectionKey
        public void cancel() {
        }

        @Override // java.nio.channels.SelectionKey
        public int interestOps() {
            return this.ops;
        }

        @Override // java.nio.channels.SelectionKey
        public SelectionKey interestOps(int i) {
            this.ops = i;
            return this;
        }

        @Override // java.nio.channels.SelectionKey
        public int readyOps() {
            boolean z = (this.ops & 1) != 0;
            boolean z2 = (this.ops & 4) != 0;
            if (z && z2) {
                WatchLeakTest.LOG.info("Channel is ready for reading and writing");
            } else if (z) {
                WatchLeakTest.LOG.info("Channel is ready for reading only");
            } else if (z2) {
                WatchLeakTest.LOG.info("Channel is ready for writing only");
            }
            return this.ops;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/quorum/WatchLeakTest$MyFollower.class */
    public static class MyFollower extends Follower {
        MyFollower(QuorumPeer quorumPeer, FollowerZooKeeperServer followerZooKeeperServer) {
            super(quorumPeer, followerZooKeeperServer);
            this.leaderOs = (OutputArchive) Mockito.mock(OutputArchive.class);
            this.leaderIs = (InputArchive) Mockito.mock(InputArchive.class);
            this.bufferedOutput = (BufferedOutputStream) Mockito.mock(BufferedOutputStream.class);
        }
    }

    @Before
    public void setUp() {
        System.setProperty("zookeeper.admin.enableServer", "false");
    }

    public WatchLeakTest(boolean z) {
        this.sessionTimedout = z;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> configs() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }

    @Test
    public void testWatchesLeak() throws Exception {
        NIOServerCnxnFactory nIOServerCnxnFactory = (NIOServerCnxnFactory) Mockito.mock(NIOServerCnxnFactory.class);
        SelectionKey fakeSK = new FakeSK();
        MockSelectorThread mockSelectorThread = (MockSelectorThread) Mockito.mock(MockSelectorThread.class);
        Mockito.when(Boolean.valueOf(mockSelectorThread.addInterestOpsUpdateRequest((SelectionKey) ArgumentMatchers.any(SelectionKey.class)))).thenAnswer(new Answer<Boolean>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.WatchLeakTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Boolean m10517answer(InvocationOnMock invocationOnMock) throws Throwable {
                SelectionKey selectionKey = (SelectionKey) invocationOnMock.getArguments()[0];
                selectionKey.interestOps(((NIOServerCnxn) selectionKey.attachment()).getInterestOps());
                return true;
            }
        });
        ZKDatabase zKDatabase = new ZKDatabase(null);
        zKDatabase.setlastProcessedZxid(2L);
        QuorumPeer quorumPeer = (QuorumPeer) Mockito.mock(QuorumPeer.class);
        FileTxnSnapLog fileTxnSnapLog = (FileTxnSnapLog) Mockito.mock(FileTxnSnapLog.class);
        Mockito.when(fileTxnSnapLog.getDataDir()).thenReturn(new File(""));
        Mockito.when(fileTxnSnapLog.getSnapDir()).thenReturn(new File(""));
        ZooKeeperServer zooKeeperServer = null;
        try {
            FollowerZooKeeperServer followerZooKeeperServer = new FollowerZooKeeperServer(fileTxnSnapLog, quorumPeer, zKDatabase);
            followerZooKeeperServer.startup();
            followerZooKeeperServer.setServerCnxnFactory(nIOServerCnxnFactory);
            quorumPeer.follower = new MyFollower(quorumPeer, followerZooKeeperServer);
            LOG.info("Follower created");
            MockNIOServerCnxn mockNIOServerCnxn = new MockNIOServerCnxn(followerZooKeeperServer, createClientSocketChannel(), fakeSK, nIOServerCnxnFactory, mockSelectorThread);
            fakeSK.attach(mockNIOServerCnxn);
            mockNIOServerCnxn.doIO(fakeSK);
            LOG.info("Client connection sent");
            quorumPeer.follower.processPacket(createValidateSessionPacketResponse(!this.sessionTimedout));
            LOG.info("Session validation sent");
            mockNIOServerCnxn.doIO(fakeSK);
            Thread.sleep(1000L);
            LOG.info("Watches processed");
            int watchCount = zKDatabase.getDataTree().getWatchCount();
            if (this.sessionTimedout) {
                LOG.info("session is not valid, watches = {}", Integer.valueOf(watchCount));
                Assert.assertEquals("Session is not valid so there should be no watches", 0L, watchCount);
            } else {
                LOG.info("session is valid, watches = {}", Integer.valueOf(watchCount));
                Assert.assertEquals("Session is valid so the watch should be there", 1L, watchCount);
            }
            if (followerZooKeeperServer != null) {
                followerZooKeeperServer.shutdown();
            }
        } catch (Throwable th) {
            if (0 != 0) {
                zooKeeperServer.shutdown();
            }
            throw th;
        }
    }

    private ByteBuffer createWatchesMessage() {
        ArrayList arrayList = new ArrayList(1);
        arrayList.add("/");
        SetWatches setWatches = new SetWatches(1L, arrayList, Collections.emptyList(), Collections.emptyList());
        RequestHeader requestHeader = new RequestHeader();
        requestHeader.setType(101);
        requestHeader.setXid(-8);
        return new MockPacket(requestHeader, new ReplyHeader(), setWatches, null, null).createAndReturnBB();
    }

    private ByteBuffer createConnRequest() {
        byte[] bArr = new byte[16];
        new Random(3007440574L).nextBytes(bArr);
        return new MockPacket(null, null, new ConnectRequest(0, 1L, 30000, 47806L, bArr), null, null, false).createAndReturnBB();
    }

    private SocketChannel createClientSocketChannel() throws IOException {
        SocketChannel socketChannel = (SocketChannel) Mockito.mock(SocketChannel.class);
        Socket socket = (Socket) Mockito.mock(Socket.class);
        Mockito.when(socket.getRemoteSocketAddress()).thenReturn(new InetSocketAddress(1234));
        Mockito.when(socketChannel.socket()).thenReturn(socket);
        ByteBuffer createConnRequest = createConnRequest();
        ByteBuffer createWatchesMessage = createWatchesMessage();
        final ByteBuffer allocate = ByteBuffer.allocate(createConnRequest.limit() + createWatchesMessage.limit());
        allocate.put(createConnRequest);
        allocate.put(createWatchesMessage);
        Mockito.when(Integer.valueOf(socketChannel.read((ByteBuffer) ArgumentMatchers.any(ByteBuffer.class)))).thenAnswer(new Answer<Integer>() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.WatchLeakTest.2
            int i = 0;

            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Integer m10518answer(InvocationOnMock invocationOnMock) throws Throwable {
                ByteBuffer byteBuffer = (ByteBuffer) invocationOnMock.getArguments()[0];
                for (int i = 0; i < byteBuffer.limit(); i++) {
                    byteBuffer.put(allocate.get(this.i));
                    this.i++;
                }
                return Integer.valueOf(byteBuffer.limit());
            }
        });
        return socketChannel;
    }

    private QuorumPacket createValidateSessionPacketResponse(boolean z) throws Exception {
        QuorumPacket createValidateSessionPacket = createValidateSessionPacket();
        long readLong = new DataInputStream(new ByteArrayInputStream(createValidateSessionPacket.getData())).readLong();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeLong(readLong);
        dataOutputStream.writeBoolean(z);
        createValidateSessionPacket.setData(byteArrayOutputStream.toByteArray());
        return createValidateSessionPacket;
    }

    private QuorumPacket createValidateSessionPacket() throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeLong(47806L);
        dataOutputStream.writeInt(3000);
        dataOutputStream.close();
        return new QuorumPacket(6, -1L, byteArrayOutputStream.toByteArray(), null);
    }
}
