package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.LongVersion;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.DistributedLogManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.metadata.LogMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.DLUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Rule;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/TestLogSegmentsZK.class */
public class TestLogSegmentsZK extends TestDistributedLogBase {
    static Logger LOG = LoggerFactory.getLogger((Class<?>) TestLogSegmentsZK.class);

    @Rule
    public TestName testName = new TestName();

    private static MaxLogSegmentSequenceNo getMaxLogSegmentSequenceNo(ZooKeeperClient zooKeeperClient, URI uri, String str, DistributedLogConfiguration distributedLogConfiguration) throws Exception {
        return new MaxLogSegmentSequenceNo(new Versioned(zooKeeperClient.get().getData(LogMetadata.getLogSegmentsPath(uri, str, distributedLogConfiguration.getUnpartitionedStreamName()), false, new Stat()), new LongVersion(r0.getVersion())));
    }

    private static void updateMaxLogSegmentSequenceNo(ZooKeeperClient zooKeeperClient, URI uri, String str, DistributedLogConfiguration distributedLogConfiguration, byte[] bArr) throws Exception {
        zooKeeperClient.get().setData(LogMetadata.getLogSegmentsPath(uri, str, distributedLogConfiguration.getUnpartitionedStreamName()), bArr, -1);
    }

    private URI createURI() throws Exception {
        return createDLMURI("/" + this.testName.getMethodName());
    }

    @Test(timeout = 60000)
    public void testCreateLogSegmentOnPrecreatedStream() throws Exception {
        URI createURI = createURI();
        String methodName = this.testName.getMethodName();
        DistributedLogConfiguration ledgerAllocatorPoolName = new DistributedLogConfiguration().setLockTimeout(99999L).setOutputBufferSize(0).setImmediateFlushEnabled(true).setEnableLedgerAllocatorPool(true).setLedgerAllocatorPoolName("test");
        Namespace build = NamespaceBuilder.newBuilder().conf(ledgerAllocatorPoolName).uri(createURI).build();
        build.createLog(methodName);
        Assert.assertEquals(0L, getMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName).getSequenceNumber());
        DistributedLogManager openLog = build.openLog(methodName);
        for (int i = 0; i < 3; i++) {
            BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) openLog.startLogSegmentNonPartitioned();
            bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(i));
            bKSyncLogWriter.closeAndComplete();
        }
        Assert.assertEquals(3L, getMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName).getSequenceNumber());
        openLog.close();
        build.close();
    }

    @Test(timeout = 60000)
    public void testCreateLogSegmentMissingMaxSequenceNumber() throws Exception {
        URI createURI = createURI();
        String methodName = this.testName.getMethodName();
        DistributedLogConfiguration ledgerAllocatorPoolName = new DistributedLogConfiguration().setLockTimeout(99999L).setOutputBufferSize(0).setImmediateFlushEnabled(true).setEnableLedgerAllocatorPool(true).setLedgerAllocatorPoolName("test");
        Namespace build = NamespaceBuilder.newBuilder().conf(ledgerAllocatorPoolName).uri(createURI).build();
        build.createLog(methodName);
        Assert.assertEquals(0L, getMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName).getSequenceNumber());
        DistributedLogManager openLog = build.openLog(methodName);
        for (int i = 0; i < 3; i++) {
            BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) openLog.startLogSegmentNonPartitioned();
            bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(i));
            bKSyncLogWriter.closeAndComplete();
        }
        Assert.assertEquals(3L, getMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName).getSequenceNumber());
        updateMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName, new byte[0]);
        DistributedLogManager openLog2 = build.openLog(methodName);
        try {
            openLog2.startLogSegmentNonPartitioned();
            Assert.fail("Should fail with unexpected exceptions");
            openLog2.close();
        } catch (UnexpectedException e) {
            openLog2.close();
        } catch (Throwable th) {
            openLog2.close();
            throw th;
        }
        updateMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName, "invalid-max".getBytes(StandardCharsets.UTF_8));
        DistributedLogManager openLog3 = build.openLog(methodName);
        try {
            openLog3.startLogSegmentNonPartitioned();
            Assert.fail("Should fail with unexpected exceptions");
            openLog3.close();
        } catch (UnexpectedException e2) {
            openLog3.close();
        } catch (Throwable th2) {
            openLog3.close();
            throw th2;
        }
        openLog.close();
        build.close();
    }

    @Test(timeout = 60000)
    public void testCreateLogSegmentUnmatchMaxSequenceNumber() throws Exception {
        URI createURI = createURI();
        String methodName = this.testName.getMethodName();
        DistributedLogConfiguration ledgerAllocatorPoolName = new DistributedLogConfiguration().setLockTimeout(99999L).setOutputBufferSize(0).setImmediateFlushEnabled(true).setEnableLedgerAllocatorPool(true).setLedgerAllocatorPoolName("test");
        Namespace build = NamespaceBuilder.newBuilder().conf(ledgerAllocatorPoolName).uri(createURI).build();
        build.createLog(methodName);
        Assert.assertEquals(0L, getMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName).getSequenceNumber());
        DistributedLogManager openLog = build.openLog(methodName);
        for (int i = 0; i < 3; i++) {
            BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) openLog.startLogSegmentNonPartitioned();
            bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(i));
            bKSyncLogWriter.closeAndComplete();
        }
        Assert.assertEquals(3L, getMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName).getSequenceNumber());
        updateMaxLogSegmentSequenceNo(getZooKeeperClient(build), createURI, methodName, ledgerAllocatorPoolName, DLUtils.serializeLogSegmentSequenceNumber(99L));
        DistributedLogManager openLog2 = build.openLog(methodName);
        try {
            BKSyncLogWriter bKSyncLogWriter2 = (BKSyncLogWriter) openLog2.startLogSegmentNonPartitioned();
            bKSyncLogWriter2.write(DLMTestUtil.getLogRecordInstance(4L));
            bKSyncLogWriter2.closeAndComplete();
            Assert.fail("Should fail creating new log segment when encountered unmatch max ledger sequence number");
            openLog2.close();
        } catch (DLIllegalStateException e) {
            openLog2.close();
        } catch (Throwable th) {
            throw th;
        }
        openLog2 = build.openLog(methodName);
        List<LogSegmentMetadata> logSegments = openLog2.getLogSegments();
        try {
            Assert.assertEquals(3L, logSegments.size());
            Assert.assertEquals(1L, logSegments.get(0).getLogSegmentSequenceNumber());
            Assert.assertEquals(2L, logSegments.get(1).getLogSegmentSequenceNumber());
            Assert.assertEquals(3L, logSegments.get(2).getLogSegmentSequenceNumber());
            openLog2.close();
            openLog.close();
            build.close();
        } finally {
            openLog2.close();
        }
    }

    @Test(timeout = 60000)
    public void testCompleteLogSegmentConflicts() throws Exception {
        URI createURI = createURI();
        String methodName = this.testName.getMethodName();
        Namespace build = NamespaceBuilder.newBuilder().conf(new DistributedLogConfiguration().setLockTimeout(99999L).setOutputBufferSize(0).setImmediateFlushEnabled(true).setEnableLedgerAllocatorPool(true).setLedgerAllocatorPoolName("test")).uri(createURI).build();
        build.createLog(methodName);
        DistributedLogManager openLog = build.openLog(methodName);
        DistributedLogManager openLog2 = build.openLog(methodName);
        BKSyncLogWriter bKSyncLogWriter = (BKSyncLogWriter) openLog.startLogSegmentNonPartitioned();
        bKSyncLogWriter.write(DLMTestUtil.getLogRecordInstance(1L));
        try {
            bKSyncLogWriter.closeAndComplete();
            Assert.fail("Should fail closeAndComplete since other people already completed it.");
        } catch (IOException e) {
        }
    }
}
