package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.LogWriter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.io.AsyncCloseable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Rule;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/TestBKLogReadHandler.class */
public class TestBKLogReadHandler extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestBKLogReadHandler.class);

    @Rule
    public TestName runtime = new TestName();

    private void prepareLogSegmentsNonPartitioned(String str, int i, int i2) throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, str);
        long j = 1;
        for (int i3 = 0; i3 < i; i3++) {
            LogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
            for (int i4 = 0; i4 < i2; i4++) {
                startLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(j));
                j++;
            }
            startLogSegmentNonPartitioned.close();
        }
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testGetFirstDLSNWithOpenLedger() throws Exception {
        String methodName = this.runtime.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setOutputBufferSize(0);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        long j = 1;
        ArrayList arrayList = new ArrayList(10);
        AsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        for (int i = 0; i < 10; i++) {
            arrayList.add(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(j)));
            j++;
        }
        Utils.ioResult(FutureUtils.collect(arrayList));
        LogRecord logRecord = new LogRecord(j, DistributedLogConstants.CONTROL_RECORD_CONTENT);
        logRecord.setControl();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(logRecord));
        Assert.assertEquals(new DLSN(1L, 9L, 0L), createNewDLM.getLastDLSN());
        Assert.assertEquals(new DLSN(1L, 0L, 0L), (DLSN) Utils.ioResult(createNewDLM.getFirstDLSNAsync()));
        Utils.close(startAsyncLogSegmentNonPartitioned);
    }

    @Test(timeout = 60000)
    public void testGetFirstDLSNNoLogSegments() throws Exception {
        try {
            Utils.ioResult(createNewDLM(conf, this.runtime.getMethodName()).createReadHandler().asyncGetFirstLogRecord());
            Assert.fail("should have thrown exception");
        } catch (LogNotFoundException e) {
        }
    }

    @Test(timeout = 60000)
    public void testGetFirstDLSNWithLogSegments() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        DLMTestUtil.generateCompletedLogSegments(createNewDLM, conf, 3L, 3L);
        try {
            Assert.assertEquals(new DLSN(1L, 0L, 0L), ((LogRecordWithDLSN) Utils.ioResult(createNewDLM.createReadHandler().asyncGetFirstLogRecord())).getDlsn());
        } catch (Exception e) {
            Assert.fail("should not have thrown exception: " + e);
        }
    }

    @Test(timeout = 60000)
    public void testGetFirstDLSNAfterCleanTruncation() throws Exception {
        String methodName = this.runtime.getMethodName();
        prepareLogSegmentsNonPartitioned(methodName, 3, 10);
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKLogReadHandler createReadHandler = createNewDLM.createReadHandler();
        Assert.assertTrue(((Boolean) Utils.ioResult(createNewDLM.startAsyncLogSegmentNonPartitioned().truncate(new DLSN(2L, 0L, 0L)))).booleanValue());
        Assert.assertEquals(new DLSN(2L, 0L, 0L), ((LogRecordWithDLSN) Utils.ioResult(createReadHandler.asyncGetFirstLogRecord())).getDlsn());
    }

    @Test(timeout = 60000)
    public void testGetFirstDLSNAfterPartialTruncation() throws Exception {
        String methodName = this.runtime.getMethodName();
        prepareLogSegmentsNonPartitioned(methodName, 3, 10);
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKLogReadHandler createReadHandler = createNewDLM.createReadHandler();
        Assert.assertTrue(((Boolean) Utils.ioResult(createNewDLM.startAsyncLogSegmentNonPartitioned().truncate(new DLSN(2L, 5L, 0L)))).booleanValue());
        Assert.assertEquals(new DLSN(2L, 0L, 0L), ((LogRecordWithDLSN) Utils.ioResult(createReadHandler.asyncGetFirstLogRecord())).getDlsn());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountEmptyLedger() throws Exception {
        try {
            Utils.ioResult(createNewDLM(conf, this.runtime.getMethodName()).createReadHandler().asyncGetLogRecordCount(DLSN.InitialDLSN));
            Assert.fail("log is empty, should have returned log empty ex");
        } catch (LogNotFoundException e) {
        }
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountTotalCount() throws Exception {
        String methodName = this.runtime.getMethodName();
        prepareLogSegmentsNonPartitioned(methodName, 11, 3);
        Assert.assertEquals(33L, ((Long) Utils.ioResult(createNewDLM(conf, methodName).createReadHandler().asyncGetLogRecordCount(DLSN.InitialDLSN))).longValue());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountAtLedgerBoundary() throws Exception {
        String methodName = this.runtime.getMethodName();
        prepareLogSegmentsNonPartitioned(methodName, 11, 3);
        BKLogReadHandler createReadHandler = createNewDLM(conf, methodName).createReadHandler();
        Assert.assertEquals(30L, ((Long) Utils.ioResult(createReadHandler.asyncGetLogRecordCount(new DLSN(2L, 0L, 0L)))).longValue());
        Assert.assertEquals(27L, ((Long) Utils.ioResult(createReadHandler.asyncGetLogRecordCount(new DLSN(3L, 0L, 0L)))).longValue());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountPastEnd() throws Exception {
        String methodName = this.runtime.getMethodName();
        prepareLogSegmentsNonPartitioned(methodName, 11, 3);
        Assert.assertEquals(0L, ((Long) Utils.ioResult(createNewDLM(conf, methodName).createReadHandler().asyncGetLogRecordCount(new DLSN(12L, 0L, 0L)))).longValue());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountLastRecord() throws Exception {
        String methodName = this.runtime.getMethodName();
        prepareLogSegmentsNonPartitioned(methodName, 11, 3);
        Assert.assertEquals(1L, ((Long) Utils.ioResult(createNewDLM(conf, methodName).createReadHandler().asyncGetLogRecordCount(new DLSN(11L, 2L, 0L)))).longValue());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountInteriorRecords() throws Exception {
        String methodName = this.runtime.getMethodName();
        prepareLogSegmentsNonPartitioned(methodName, 5, 10);
        BKLogReadHandler createReadHandler = createNewDLM(conf, methodName).createReadHandler();
        Assert.assertEquals(25L, ((Long) Utils.ioResult(createReadHandler.asyncGetLogRecordCount(new DLSN(3L, 5L, 0L)))).longValue());
        Assert.assertEquals(35L, ((Long) Utils.ioResult(createReadHandler.asyncGetLogRecordCount(new DLSN(2L, 5L, 0L)))).longValue());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountWithControlRecords() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        long generateLogSegmentNonPartitioned = 1 + DLMTestUtil.generateLogSegmentNonPartitioned(createNewDLM, 5, 5, 1L);
        long generateLogSegmentNonPartitioned2 = generateLogSegmentNonPartitioned + DLMTestUtil.generateLogSegmentNonPartitioned(createNewDLM, 0, 10, generateLogSegmentNonPartitioned);
        Assert.assertEquals(15L, ((Long) Utils.ioResult(createNewDLM.createReadHandler().asyncGetLogRecordCount(new DLSN(1L, 0L, 0L)))).longValue());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountWithAllControlRecords() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        long generateLogSegmentNonPartitioned = 1 + DLMTestUtil.generateLogSegmentNonPartitioned(createNewDLM, 5, 0, 1L);
        long generateLogSegmentNonPartitioned2 = generateLogSegmentNonPartitioned + DLMTestUtil.generateLogSegmentNonPartitioned(createNewDLM, 10, 0, generateLogSegmentNonPartitioned);
        Assert.assertEquals(0L, ((Long) Utils.ioResult(createNewDLM.createReadHandler().asyncGetLogRecordCount(new DLSN(1L, 0L, 0L)))).longValue());
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountWithSingleInProgressLedger() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        int i = 1 + 1;
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(1, false)));
        int i2 = i + 1;
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(i, false)));
        int i3 = i2 + 1;
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(i2, false)));
        BKLogReadHandler createReadHandler = createNewDLM.createReadHandler();
        List list = (List) ((Versioned) Utils.ioResult(createReadHandler.readLogSegmentsFromStore(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null))).getValue();
        Assert.assertEquals(1L, list.size());
        Assert.assertTrue(((LogSegmentMetadata) list.get(0)).isInProgress());
        Assert.assertEquals(2L, ((Long) Utils.ioResult(createReadHandler.asyncGetLogRecordCount(new DLSN(1L, 0L, 0L)))).longValue());
        Utils.close((AsyncCloseable) startAsyncLogSegmentNonPartitioned);
    }

    @Test(timeout = 60000)
    public void testGetLogRecordCountWithCompletedAndInprogressLedgers() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        long generateLogSegmentNonPartitioned = 1 + DLMTestUtil.generateLogSegmentNonPartitioned(createNewDLM, 0, 5, 1L);
        BKAsyncLogWriter startAsyncLogSegmentNonPartitioned = createNewDLM.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(1L, false)));
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(generateLogSegmentNonPartitioned + 1, false)));
        long j = 1 + 1;
        Utils.ioResult(startAsyncLogSegmentNonPartitioned.write(DLMTestUtil.getLargeLogRecordInstance(1 + 1, false)));
        BKLogReadHandler createReadHandler = createNewDLM.createReadHandler();
        List list = (List) ((Versioned) Utils.ioResult(createReadHandler.readLogSegmentsFromStore(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null))).getValue();
        Assert.assertEquals(2L, list.size());
        Assert.assertFalse(((LogSegmentMetadata) list.get(0)).isInProgress());
        Assert.assertTrue(((LogSegmentMetadata) list.get(1)).isInProgress());
        Assert.assertEquals(7L, ((Long) Utils.ioResult(createReadHandler.asyncGetLogRecordCount(new DLSN(1L, 0L, 0L)))).longValue());
        Utils.close((AsyncCloseable) startAsyncLogSegmentNonPartitioned);
    }

    @Test(timeout = 60000)
    public void testLockStreamWithMissingLog() throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(conf, this.runtime.getMethodName());
        try {
            Utils.ioResult(createNewDLM.createReadHandler().lockStream());
            Assert.fail("Should fail lock stream if log not found");
        } catch (LogNotFoundException e) {
        }
        try {
            Utils.ioResult(createNewDLM.createReadHandler(Optional.of("test-subscriber")).lockStream());
            Assert.fail("Subscriber should fail lock stream if log not found");
        } catch (LogNotFoundException e2) {
        }
    }

    @Test(timeout = 60000)
    public void testLockStreamDifferentSubscribers() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        DLMTestUtil.generateLogSegmentNonPartitioned(createNewDLM, 0, 5, 1L);
        BKLogReadHandler createReadHandler = createNewDLM.createReadHandler();
        Utils.ioResult(createReadHandler.lockStream());
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, methodName);
        BKLogReadHandler createReadHandler2 = createNewDLM2.createReadHandler(Optional.of("s1"));
        Utils.ioResult(createReadHandler2.lockStream());
        BKDistributedLogManager createNewDLM3 = createNewDLM(conf, methodName);
        BKLogReadHandler createReadHandler3 = createNewDLM3.createReadHandler(Optional.of("s2"));
        Utils.ioResult(createReadHandler3.lockStream());
        createReadHandler.asyncClose();
        createNewDLM.close();
        createReadHandler2.asyncClose();
        createNewDLM2.close();
        createReadHandler3.asyncClose();
        createNewDLM3.close();
    }

    @Test(timeout = 60000)
    public void testLockStreamSameSubscriber() throws Exception {
        String methodName = this.runtime.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        DLMTestUtil.generateLogSegmentNonPartitioned(createNewDLM, 0, 5, 1L);
        BKLogReadHandler createReadHandler = createNewDLM.createReadHandler();
        Utils.ioResult(createReadHandler.lockStream());
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, methodName);
        BKLogReadHandler createReadHandler2 = createNewDLM2.createReadHandler(Optional.of("s1"));
        Utils.ioResult(createReadHandler2.lockStream());
        BKDistributedLogManager createNewDLM3 = createNewDLM(conf, methodName);
        BKLogReadHandler createReadHandler3 = createNewDLM3.createReadHandler(Optional.of("s1"));
        try {
            Utils.ioResult(createReadHandler3.lockStream(), 10000L, TimeUnit.MILLISECONDS);
            Assert.fail("Should fail lock stream using same subscriber id");
        } catch (TimeoutException e) {
        } catch (OwnershipAcquireFailedException e2) {
        }
        createReadHandler.asyncClose();
        createNewDLM.close();
        createReadHandler2.asyncClose();
        createNewDLM2.close();
        createReadHandler3.asyncClose();
        createNewDLM3.close();
    }
}
