package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.impl.federated;

import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Iterators;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Sets;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.NamespaceListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.apache.pulsar.functions.runtime.shaded.org.junit.After;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Before;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Rule;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.rules.TestName;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.class */
public class TestFederatedZKLogMetadataStore extends TestDistributedLogBase {
    private static final int zkSessionTimeoutMs = 2000;
    private static final int maxLogsPerSubnamespace = 10;

    @Rule
    public TestName runtime = new TestName();
    protected final DistributedLogConfiguration baseConf = new DistributedLogConfiguration().setFederatedMaxLogsPerSubnamespace(10);
    protected ZooKeeperClient zkc;
    protected FederatedZKLogMetadataStore metadataStore;
    protected OrderedScheduler scheduler;
    protected URI uri;

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore$TestNamespaceListener.class */
    static class TestNamespaceListener implements NamespaceListener {
        final CountDownLatch doneLatch = new CountDownLatch(1);
        final AtomicReference<Iterator<String>> resultHolder = new AtomicReference<>();

        TestNamespaceListener() {
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.NamespaceListener
        public void onStreamsChanged(Iterator<String> it) {
            this.resultHolder.set(it);
            if (it.hasNext()) {
                this.doneLatch.countDown();
            }
        }

        Iterator<String> getResult() {
            return this.resultHolder.get();
        }

        void waitForDone() throws InterruptedException {
            this.doneLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore$TestNamespaceListenerWithExpectedSize.class */
    static class TestNamespaceListenerWithExpectedSize implements NamespaceListener {
        final int expectedSize;
        final CountDownLatch doneLatch = new CountDownLatch(1);
        final AtomicReference<Set<String>> resultHolder = new AtomicReference<>();

        TestNamespaceListenerWithExpectedSize(int i) {
            this.expectedSize = i;
        }

        Set<String> getResult() {
            return this.resultHolder.get();
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.callback.NamespaceListener
        public void onStreamsChanged(Iterator<String> it) {
            ArrayList newArrayList = Lists.newArrayList(it);
            if (newArrayList.size() < this.expectedSize) {
                return;
            }
            this.resultHolder.set(Sets.newTreeSet(newArrayList));
            this.doneLatch.countDown();
        }

        void waitForDone() throws InterruptedException {
            this.doneLatch.await();
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zkc = TestZooKeeperClientBuilder.newBuilder().uri(createDLMURI("/")).sessionTimeoutMs(2000).build();
        this.scheduler = OrderedScheduler.newSchedulerBuilder().name("test-zk-logmetadata-store").numThreads(2).build();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.baseConf);
        this.uri = createDLMURI("/" + this.runtime.getMethodName());
        FederatedZKLogMetadataStore.createFederatedNamespace(this.uri, this.zkc);
        this.metadataStore = new FederatedZKLogMetadataStore(distributedLogConfiguration, this.uri, this.zkc, this.scheduler);
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        if (null != this.zkc) {
            this.zkc.close();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
    }

    private void deleteLog(String str) throws Exception {
        Optional optional = (Optional) Utils.ioResult(this.metadataStore.getLogLocation(str));
        Assert.assertTrue(optional.isPresent());
        this.zkc.get().delete(((URI) optional.get()).getPath() + "/" + str, -1);
    }

    @Test(timeout = 60000)
    public void testBasicOperations() throws Exception {
        TestNamespaceListener testNamespaceListener = new TestNamespaceListener();
        this.metadataStore.registerNamespaceListener(testNamespaceListener);
        Assert.assertEquals(this.uri, (URI) Utils.ioResult(this.metadataStore.createLog("test-log-1")));
        Optional optional = (Optional) Utils.ioResult(this.metadataStore.getLogLocation("test-log-1"));
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals(this.uri, optional.get());
        Assert.assertFalse(((Optional) Utils.ioResult(this.metadataStore.getLogLocation("non-existent-log"))).isPresent());
        testNamespaceListener.waitForDone();
        Iterator<String> result = testNamespaceListener.getResult();
        Assert.assertTrue(result.hasNext());
        Assert.assertEquals("test-log-1", result.next());
        Assert.assertFalse(result.hasNext());
        Iterator it = (Iterator) Utils.ioResult(this.metadataStore.getLogs(""));
        Assert.assertTrue(it.hasNext());
        Assert.assertEquals("test-log-1", it.next());
        Assert.assertFalse(it.hasNext());
    }

    @Test(timeout = 60000)
    public void testMultipleListeners() throws Exception {
        TestNamespaceListener testNamespaceListener = new TestNamespaceListener();
        TestNamespaceListener testNamespaceListener2 = new TestNamespaceListener();
        this.metadataStore.registerNamespaceListener(testNamespaceListener);
        this.metadataStore.registerNamespaceListener(testNamespaceListener2);
        Assert.assertEquals(this.uri, (URI) Utils.ioResult(this.metadataStore.createLog("test-multiple-listeners")));
        testNamespaceListener.waitForDone();
        testNamespaceListener2.waitForDone();
        Assert.assertTrue(Iterators.elementsEqual(testNamespaceListener.getResult(), testNamespaceListener2.getResult()));
    }

    @Test(timeout = 60000)
    public void testCreateLog() throws Exception {
        FederatedZKLogMetadataStore federatedZKLogMetadataStore;
        FederatedZKLogMetadataStore federatedZKLogMetadataStore2;
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.baseConf);
        FederatedZKLogMetadataStore federatedZKLogMetadataStore3 = new FederatedZKLogMetadataStore(distributedLogConfiguration, this.uri, TestZooKeeperClientBuilder.newBuilder().uri(this.uri).sessionTimeoutMs(2000).build(), this.scheduler);
        for (int i = 0; i < 20; i++) {
            if (i % 2 == 0) {
                federatedZKLogMetadataStore = this.metadataStore;
                federatedZKLogMetadataStore2 = federatedZKLogMetadataStore3;
            } else {
                federatedZKLogMetadataStore = federatedZKLogMetadataStore3;
                federatedZKLogMetadataStore2 = this.metadataStore;
            }
            FederatedZKLogMetadataStore federatedZKLogMetadataStore4 = federatedZKLogMetadataStore2;
            String str = "test-create-log-" + i;
            URI uri = (URI) Utils.ioResult(federatedZKLogMetadataStore.createLog(str));
            Optional optional = (Optional) Utils.ioResult(federatedZKLogMetadataStore4.getLogLocation(str));
            Assert.assertTrue("Log " + str + " doesn't exist", optional.isPresent());
            Assert.assertEquals("Different log location " + optional.get() + " is found", uri, optional.get());
        }
        Assert.assertEquals(2L, this.metadataStore.getSubnamespaces().size());
        Assert.assertEquals(2L, federatedZKLogMetadataStore3.getSubnamespaces().size());
    }

    @Test(timeout = 60000)
    public void testDuplicatedLogs() throws Exception {
        new DistributedLogConfiguration().addConfiguration(this.baseConf);
        Utils.ioResult(this.metadataStore.createLog("test-log"));
        URI uri = (URI) Utils.ioResult(this.metadataStore.createSubNamespace());
        URI uri2 = (URI) Utils.ioResult(this.metadataStore.createSubNamespace());
        this.metadataStore.createLogInNamespaceSync(uri, "test-duplicated-logs");
        this.metadataStore.createLogInNamespaceSync(uri2, "test-duplicated-logs");
        try {
            Utils.ioResult(this.metadataStore.createLog("non-existent-log"));
            Assert.fail("should throw exception when duplicated log found");
        } catch (UnexpectedException e) {
            Assert.assertTrue(this.metadataStore.duplicatedLogFound.get());
        }
        try {
            Utils.ioResult(this.metadataStore.getLogLocation("test-log"));
            Assert.fail("should throw exception when duplicated log found");
        } catch (UnexpectedException e2) {
            Assert.assertTrue(this.metadataStore.duplicatedLogFound.get());
        }
        try {
            Utils.ioResult(this.metadataStore.getLogLocation("non-existent-log"));
            Assert.fail("should throw exception when duplicated log found");
        } catch (UnexpectedException e3) {
            Assert.assertTrue(this.metadataStore.duplicatedLogFound.get());
        }
        try {
            Utils.ioResult(this.metadataStore.getLogLocation("test-duplicated-logs"));
            Assert.fail("should throw exception when duplicated log found");
        } catch (UnexpectedException e4) {
            Assert.assertTrue(this.metadataStore.duplicatedLogFound.get());
        }
        try {
            Utils.ioResult(this.metadataStore.getLogs(""));
            Assert.fail("should throw exception when duplicated log found");
        } catch (UnexpectedException e5) {
            Assert.assertTrue(this.metadataStore.duplicatedLogFound.get());
        }
    }

    @Test(timeout = 60000)
    public void testGetLogLocationWhenCacheMissed() throws Exception {
        URI uri = (URI) Utils.ioResult(this.metadataStore.createLog("test-get-location-when-cache-missed"));
        Assert.assertEquals(this.uri, uri);
        this.metadataStore.removeLogFromCache("test-get-location-when-cache-missed");
        Optional optional = (Optional) Utils.ioResult(this.metadataStore.getLogLocation("test-get-location-when-cache-missed"));
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals(uri, optional.get());
    }

    @Test(timeout = 60000, expected = LogExistsException.class)
    public void testCreateLogWhenCacheMissed() throws Exception {
        Assert.assertEquals(this.uri, (URI) Utils.ioResult(this.metadataStore.createLog("test-create-log-when-cache-missed")));
        this.metadataStore.removeLogFromCache("test-create-log-when-cache-missed");
        Utils.ioResult(this.metadataStore.createLog("test-create-log-when-cache-missed"));
    }

    @Test(timeout = 60000, expected = LogExistsException.class)
    public void testCreateLogWhenLogExists() throws Exception {
        Assert.assertEquals(this.uri, (URI) Utils.ioResult(this.metadataStore.createLog("test-create-log-when-log-exists")));
        Utils.ioResult(this.metadataStore.createLog("test-create-log-when-log-exists"));
    }

    private Set<String> createLogs(int i, String str) throws Exception {
        TreeSet newTreeSet = Sets.newTreeSet();
        for (int i2 = 0; i2 < i; i2++) {
            String str2 = str + i2;
            Utils.ioResult(this.metadataStore.createLog(str2));
            newTreeSet.add(str2);
        }
        return newTreeSet;
    }

    private Set<String> createLogs(URI uri, int i, String str) throws Exception {
        TreeSet newTreeSet = Sets.newTreeSet();
        for (int i2 = 0; i2 < i; i2++) {
            String str2 = str + i2;
            this.metadataStore.createLogInNamespaceSync(uri, str2);
            newTreeSet.add(str2);
        }
        return newTreeSet;
    }

    @Test(timeout = 60000)
    public void testGetLogs() throws Exception {
        TreeSet treeSet;
        Set<String> createLogs = createLogs(30, "test-get-logs");
        do {
            TimeUnit.MILLISECONDS.sleep(20L);
            treeSet = new TreeSet();
            treeSet.addAll(Lists.newArrayList((Iterator) Utils.ioResult(this.metadataStore.getLogs(""))));
        } while (treeSet.size() < 30);
        Assert.assertEquals(30, treeSet.size());
        Assert.assertTrue(Sets.difference(createLogs, treeSet).isEmpty());
    }

    @Test(timeout = 60000)
    public void testNamespaceListener() throws Exception {
        TestNamespaceListenerWithExpectedSize testNamespaceListenerWithExpectedSize = new TestNamespaceListenerWithExpectedSize(30);
        this.metadataStore.registerNamespaceListener(testNamespaceListenerWithExpectedSize);
        Set<String> createLogs = createLogs(30, "test-namespace-listener");
        testNamespaceListenerWithExpectedSize.waitForDone();
        Set<String> result = testNamespaceListenerWithExpectedSize.getResult();
        Assert.assertEquals(30, result.size());
        Assert.assertTrue(Sets.difference(createLogs, result).isEmpty());
        String str = "test-namespace-listener" + new Random(System.currentTimeMillis()).nextInt(30);
        TestNamespaceListener testNamespaceListener = new TestNamespaceListener();
        this.metadataStore.registerNamespaceListener(testNamespaceListener);
        deleteLog(str);
        testNamespaceListener.waitForDone();
        Assert.assertEquals(30 - 1, Sets.newTreeSet(Lists.newArrayList(testNamespaceListener.getResult())).size());
        createLogs.remove(str);
        Assert.assertTrue(Sets.difference(createLogs, result).isEmpty());
    }

    @Test(timeout = 60000)
    public void testCreateLogPickingFirstAvailableSubNamespace() throws Exception {
        TreeSet treeSet;
        URI uri = (URI) Utils.ioResult(this.metadataStore.createSubNamespace());
        URI uri2 = (URI) Utils.ioResult(this.metadataStore.createSubNamespace());
        Set<String> createLogs = createLogs(this.uri, 9, "test-ns0-");
        Set<String> createLogs2 = createLogs(uri, 10, "test-ns1-");
        Set<String> createLogs3 = createLogs(uri2, 10, "test-ns2-");
        TreeSet newTreeSet = Sets.newTreeSet();
        newTreeSet.addAll(createLogs);
        newTreeSet.addAll(createLogs2);
        newTreeSet.addAll(createLogs3);
        do {
            TimeUnit.MILLISECONDS.sleep(20L);
            treeSet = new TreeSet();
            treeSet.addAll(Lists.newArrayList((Iterator) Utils.ioResult(this.metadataStore.getLogs(""))));
        } while (treeSet.size() < 29);
        TestNamespaceListenerWithExpectedSize testNamespaceListenerWithExpectedSize = new TestNamespaceListenerWithExpectedSize(31);
        this.metadataStore.registerNamespaceListener(testNamespaceListenerWithExpectedSize);
        Assert.assertEquals(3L, ((Set) Utils.ioResult(this.metadataStore.fetchSubNamespaces(null))).size());
        URI uri3 = (URI) Utils.ioResult(this.metadataStore.createLog("test-pick-first-available-ns"));
        newTreeSet.add("test-pick-first-available-ns");
        Assert.assertEquals(this.uri, uri3);
        Set set = (Set) Utils.ioResult(this.metadataStore.fetchSubNamespaces(null));
        Assert.assertEquals(3L, set.size());
        URI uri4 = (URI) Utils.ioResult(this.metadataStore.createLog("test-create-new-ns"));
        newTreeSet.add("test-create-new-ns");
        Assert.assertFalse(set.contains(uri4));
        Assert.assertEquals(4L, ((Set) Utils.ioResult(this.metadataStore.fetchSubNamespaces(null))).size());
        testNamespaceListenerWithExpectedSize.waitForDone();
        Set<String> result = testNamespaceListenerWithExpectedSize.getResult();
        Assert.assertEquals(31L, result.size());
        Assert.assertEquals(newTreeSet, result);
    }

    @Test(timeout = 60000)
    public void testZooKeeperSessionExpired() throws Exception {
        Set<String> createLogs = createLogs(20, "test-zookeeper-session-expired-");
        TestNamespaceListenerWithExpectedSize testNamespaceListenerWithExpectedSize = new TestNamespaceListenerWithExpectedSize(21);
        this.metadataStore.registerNamespaceListener(testNamespaceListenerWithExpectedSize);
        ZooKeeperClientUtils.expireSession(this.zkc, BKNamespaceDriver.getZKServersFromDLUri(this.uri), 2000);
        createLogs.add("test-log-name");
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.baseConf);
        Utils.ioResult(new FederatedZKLogMetadataStore(distributedLogConfiguration, this.uri, TestZooKeeperClientBuilder.newBuilder().uri(this.uri).sessionTimeoutMs(2000).build(), this.scheduler).createLog("test-log-name"));
        testNamespaceListenerWithExpectedSize.waitForDone();
        Set<String> result = testNamespaceListenerWithExpectedSize.getResult();
        Assert.assertEquals(21L, result.size());
        Assert.assertEquals(createLogs, result);
    }
}
