package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.admin;

import java.net.URI;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.DLMTestUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.DLSN;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.LogRecord;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.ZooKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.AsyncLogReader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.DistributedLogManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.junit.After;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/admin/TestDistributedLogAdmin.class */
public class TestDistributedLogAdmin extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestDistributedLogAdmin.class);
    private ZooKeeperClient zooKeeperClient;

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zooKeeperClient = TestZooKeeperClientBuilder.newBuilder().uri(createDLMURI("/")).build();
        conf.setTraceReadAheadMetadataChanges(true);
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        this.zooKeeperClient.close();
    }

    public void testChangeSequenceNumber() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setLogSegmentSequenceNumberValidationEnabled(false);
        distributedLogConfiguration.setLogSegmentCacheEnabled(false);
        DistributedLogConfiguration distributedLogConfiguration2 = new DistributedLogConfiguration();
        distributedLogConfiguration2.addConfiguration(conf);
        distributedLogConfiguration2.setLogSegmentCacheEnabled(false);
        distributedLogConfiguration2.setLogSegmentSequenceNumberValidationEnabled(true);
        URI createDLMURI = createDLMURI("/change-sequence-number");
        this.zooKeeperClient.get().create(createDLMURI.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Namespace build = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(createDLMURI).build();
        Namespace build2 = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration2).uri(createDLMURI).build();
        DistributedLogManager openLog = build.openLog("change-sequence-number");
        DLMTestUtil.generateCompletedLogSegments(openLog, distributedLogConfiguration, 4L, 10L);
        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(openLog, distributedLogConfiguration, 5L, 41L, false, 10L, true);
        openLog.close();
        DistributedLogManager openLog2 = build2.openLog("change-sequence-number");
        AsyncLogReader asyncLogReader = openLog2.getAsyncLogReader(DLSN.InitialDLSN);
        long j = 1;
        DLSN dlsn = DLSN.InitialDLSN;
        for (int i = 0; i < 40; i++) {
            LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext());
            Assert.assertNotNull(logRecordWithDLSN);
            DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
            Assert.assertEquals(j, logRecordWithDLSN.getTransactionId());
            j++;
            dlsn = logRecordWithDLSN.getDlsn();
        }
        LOG.info("Injecting bad log segment '3'");
        DistributedLogManager openLog3 = build.openLog("change-sequence-number");
        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(openLog3, distributedLogConfiguration, 3L, 51L, true, 10L, false);
        LOG.info("Injected bad log segment '3'");
        try {
            Assert.fail("Should fail reading next record " + ((LogRecordWithDLSN) Utils.ioResult(asyncLogReader.readNext())) + " when there is a corrupted log segment");
        } catch (UnexpectedException e) {
        }
        LOG.info("Dryrun fix inprogress segment that has lower sequence number");
        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(build, new DryrunLogSegmentMetadataStoreUpdater(distributedLogConfiguration, getLogSegmentMetadataStore(build)), "change-sequence-number", false, false);
        try {
            Utils.ioResult(openLog2.getAsyncLogReader(dlsn).readNext());
            Assert.fail("Should fail reading next when there is a corrupted log segment");
        } catch (UnexpectedException e2) {
        }
        LOG.info("Actual run fix inprogress segment that has lower sequence number");
        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(build, LogSegmentMetadataStoreUpdater.createMetadataUpdater(distributedLogConfiguration, getLogSegmentMetadataStore(build)), "change-sequence-number", false, false);
        AsyncLogReader asyncLogReader2 = openLog2.getAsyncLogReader(dlsn);
        Utils.ioResult(asyncLogReader2.readNext());
        LogRecord logRecord = (LogRecord) Utils.ioResult(asyncLogReader2.readNext());
        Assert.assertNotNull(logRecord);
        DLMTestUtil.verifyLogRecord(logRecord);
        Assert.assertEquals(51L, logRecord.getTransactionId());
        long j2 = 51 + 1;
        for (int i2 = 1; i2 < 10; i2++) {
            LogRecord logRecord2 = (LogRecord) Utils.ioResult(asyncLogReader2.readNext());
            Assert.assertNotNull(logRecord2);
            DLMTestUtil.verifyLogRecord(logRecord2);
            Assert.assertEquals(j2, logRecord2.getTransactionId());
            j2++;
        }
        Utils.close(asyncLogReader2);
        openLog2.close();
        openLog3.close();
        build.close();
        build2.close();
    }
}
