package org.apache.bookkeeper.client;

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ReadLastConfirmedOp;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.class */
public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
    static final Logger LOG = LoggerFactory.getLogger(ParallelLedgerRecoveryTest.class);
    final BookKeeper.DigestType digestType;

    /* loaded from: input_file:org/apache/bookkeeper/client/ParallelLedgerRecoveryTest$DelayResponseBookie.class */
    static class DelayResponseBookie extends Bookie {
        private final AtomicBoolean delayAddResponse;
        private final AtomicBoolean delayReadResponse;
        private final AtomicLong delayReadOnEntry;
        private volatile CountDownLatch delayReadLatch;
        private final LinkedBlockingQueue<WriteCallbackEntry> delayQueue;

        /* loaded from: input_file:org/apache/bookkeeper/client/ParallelLedgerRecoveryTest$DelayResponseBookie$WriteCallbackEntry.class */
        static final class WriteCallbackEntry {
            private final BookkeeperInternalCallbacks.WriteCallback cb;
            private final int rc;
            private final long ledgerId;
            private final long entryId;
            private final BookieSocketAddress addr;
            private final Object ctx;

            WriteCallbackEntry(BookkeeperInternalCallbacks.WriteCallback writeCallback, int i, long j, long j2, BookieSocketAddress bookieSocketAddress, Object obj) {
                this.cb = writeCallback;
                this.rc = i;
                this.ledgerId = j;
                this.entryId = j2;
                this.addr = bookieSocketAddress;
                this.ctx = obj;
            }

            public void callback() {
                this.cb.writeComplete(this.rc, this.ledgerId, this.entryId, this.addr, this.ctx);
            }
        }

        public DelayResponseBookie(ServerConfiguration serverConfiguration) throws IOException, KeeperException, InterruptedException, BookieException {
            super(serverConfiguration);
            this.delayAddResponse = new AtomicBoolean(false);
            this.delayReadResponse = new AtomicBoolean(false);
            this.delayReadOnEntry = new AtomicLong(-1234L);
            this.delayReadLatch = null;
            this.delayQueue = new LinkedBlockingQueue<>();
        }

        public void addEntry(ByteBuf byteBuf, boolean z, final BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj, byte[] bArr) throws IOException, BookieException {
            super.addEntry(byteBuf, z, new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.DelayResponseBookie.1
                public void writeComplete(int i, long j, long j2, BookieSocketAddress bookieSocketAddress, Object obj2) {
                    if (DelayResponseBookie.this.delayAddResponse.get()) {
                        DelayResponseBookie.this.delayQueue.add(new WriteCallbackEntry(writeCallback, i, j, j2, bookieSocketAddress, obj2));
                    } else {
                        writeCallback.writeComplete(i, j, j2, bookieSocketAddress, obj2);
                    }
                }
            }, obj, bArr);
        }

        public ByteBuf readEntry(long j, long j2) throws IOException, Bookie.NoLedgerException {
            CountDownLatch countDownLatch;
            ParallelLedgerRecoveryTest.LOG.info("ReadEntry {} - {}", Long.valueOf(j), Long.valueOf(j2));
            if (this.delayReadResponse.get() && this.delayReadOnEntry.get() == j2 && null != (countDownLatch = this.delayReadLatch)) {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return super.readEntry(j, j2);
        }

        void delayAdd(boolean z) {
            this.delayAddResponse.set(z);
        }

        void delayRead(boolean z, long j, CountDownLatch countDownLatch) {
            this.delayReadResponse.set(z);
            this.delayReadOnEntry.set(j);
            this.delayReadLatch = countDownLatch;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/client/ParallelLedgerRecoveryTest$TestLedgerManager.class */
    public static class TestLedgerManager implements LedgerManager {
        final LedgerManager lm;
        volatile CountDownLatch waitLatch = null;
        final ExecutorService executorService = Executors.newSingleThreadExecutor();

        TestLedgerManager(LedgerManager ledgerManager) {
            this.lm = ledgerManager;
        }

        void setLatch(CountDownLatch countDownLatch) {
            this.waitLatch = countDownLatch;
        }

        public void createLedgerMetadata(long j, LedgerMetadata ledgerMetadata, BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata> genericCallback) {
            this.lm.createLedgerMetadata(j, ledgerMetadata, genericCallback);
        }

        public void removeLedgerMetadata(long j, Version version, BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback) {
            this.lm.removeLedgerMetadata(j, version, genericCallback);
        }

        public void readLedgerMetadata(long j, BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata> genericCallback) {
            this.lm.readLedgerMetadata(j, genericCallback);
        }

        public LedgerManager.LedgerRangeIterator getLedgerRanges() {
            return this.lm.getLedgerRanges();
        }

        public void writeLedgerMetadata(final long j, final LedgerMetadata ledgerMetadata, final BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata> genericCallback) {
            final CountDownLatch countDownLatch = this.waitLatch;
            if (null != countDownLatch) {
                this.executorService.submit(new Runnable() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.TestLedgerManager.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            countDownLatch.await();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            ParallelLedgerRecoveryTest.LOG.error("Interrupted on waiting latch : ", e);
                        }
                        TestLedgerManager.this.lm.writeLedgerMetadata(j, ledgerMetadata, genericCallback);
                    }
                });
            } else {
                this.lm.writeLedgerMetadata(j, ledgerMetadata, genericCallback);
            }
        }

        public void registerLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
            this.lm.registerLedgerMetadataListener(j, ledgerMetadataListener);
        }

        public void unregisterLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
            this.lm.unregisterLedgerMetadataListener(j, ledgerMetadataListener);
        }

        public void asyncProcessLedgers(BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback voidCallback, Object obj, int i, int i2) {
            this.lm.asyncProcessLedgers(processor, voidCallback, obj, i, i2);
        }

        public void close() throws IOException {
            this.lm.close();
            this.executorService.shutdown();
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/client/ParallelLedgerRecoveryTest$TestLedgerManagerFactory.class */
    static class TestLedgerManagerFactory extends HierarchicalLedgerManagerFactory {
        TestLedgerManagerFactory() {
        }

        public LedgerManager newLedgerManager() {
            return new TestLedgerManager(super.newLedgerManager());
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/client/ParallelLedgerRecoveryTest$TestMetadataBookieDriver.class */
    static class TestMetadataBookieDriver extends ZKMetadataBookieDriver {
        TestMetadataBookieDriver() {
        }

        public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException {
            if (null == this.lmFactory) {
                try {
                    this.lmFactory = new TestLedgerManagerFactory().initialize(this.conf, this.layoutManager, 1);
                } catch (IOException e) {
                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
                }
            }
            return this.lmFactory;
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/client/ParallelLedgerRecoveryTest$TestMetadataClientDriver.class */
    static class TestMetadataClientDriver extends ZKMetadataClientDriver {
        TestMetadataClientDriver() {
        }

        public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException {
            if (null == this.lmFactory) {
                try {
                    this.lmFactory = new TestLedgerManagerFactory().initialize(this.conf, this.layoutManager, 1);
                } catch (IOException e) {
                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
                }
            }
            return this.lmFactory;
        }
    }

    public ParallelLedgerRecoveryTest() throws Exception {
        super(3);
        this.digestType = BookKeeper.DigestType.CRC32;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    public void startBKCluster(String str) throws Exception {
        MetadataDrivers.registerClientDriver("zk", TestMetadataClientDriver.class, true);
        MetadataDrivers.registerBookieDriver("zk", TestMetadataBookieDriver.class, true);
        this.baseConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
        this.baseClientConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class);
        this.baseClientConf.setReadEntryTimeout(60000);
        this.baseClientConf.setAddEntryTimeout(60000);
        super.startBKCluster(str);
    }

    @Override // org.apache.bookkeeper.test.BookKeeperClusterTestCase
    @After
    public void tearDown() throws Exception {
        try {
            super.tearDown();
            MetadataDrivers.registerClientDriver("zk", ZKMetadataClientDriver.class, true);
            MetadataDrivers.registerBookieDriver("zk", ZKMetadataBookieDriver.class, true);
        } catch (Throwable th) {
            MetadataDrivers.registerClientDriver("zk", ZKMetadataClientDriver.class, true);
            MetadataDrivers.registerBookieDriver("zk", ZKMetadataBookieDriver.class, true);
            throw th;
        }
    }

    @Test
    public void testRecoverBeforeWriteMetadata1() throws Exception {
        rereadDuringRecovery(true, 1, false, false);
    }

    @Test
    public void testRecoverBeforeWriteMetadata2() throws Exception {
        rereadDuringRecovery(true, 3, false, false);
    }

    @Test
    public void testRecoverBeforeWriteMetadata3() throws Exception {
        rereadDuringRecovery(false, 1, false, false);
    }

    @Test
    public void testRecoverBeforeWriteMetadata4() throws Exception {
        rereadDuringRecovery(false, 3, false, false);
    }

    @Test
    public void testRereadDuringRecovery1() throws Exception {
        rereadDuringRecovery(true, 1, true, false);
    }

    @Test
    public void testRereadDuringRecovery2() throws Exception {
        rereadDuringRecovery(true, 3, true, false);
    }

    @Test
    public void testRereadDuringRecovery3() throws Exception {
        rereadDuringRecovery(false, 1, true, false);
    }

    @Test
    public void testRereadDuringRecovery4() throws Exception {
        rereadDuringRecovery(false, 3, true, false);
    }

    @Test
    public void testConcurrentRecovery1() throws Exception {
        rereadDuringRecovery(true, 1, true, false);
    }

    @Test
    public void testConcurrentRecovery2() throws Exception {
        rereadDuringRecovery(true, 3, true, false);
    }

    @Test
    public void testConcurrentRecovery3() throws Exception {
        rereadDuringRecovery(false, 1, true, false);
    }

    @Test
    public void testConcurrentRecovery4() throws Exception {
        rereadDuringRecovery(false, 3, true, false);
    }

    private void rereadDuringRecovery(boolean z, int i, boolean z2, boolean z3) throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.addConfiguration(this.baseClientConf);
        clientConfiguration.setEnableParallelRecoveryRead(z);
        clientConfiguration.setRecoveryReadBatchSize(i);
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        TestLedgerManager testLedgerManager = (TestLedgerManager) bookKeeper.getUnderlyingLedgerManager();
        final LedgerHandle createLedger = bookKeeper.createLedger(this.numBookies, 2, 2, this.digestType, "".getBytes());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        sleepBookie((BookieSocketAddress) createLedger.getLedgerMetadata().currentEnsemble.get(0), countDownLatch);
        sleepBookie((BookieSocketAddress) createLedger.getLedgerMetadata().currentEnsemble.get(1), countDownLatch2);
        int i2 = (this.numBookies * 3) + 1;
        final AtomicInteger atomicInteger = new AtomicInteger(i2);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        for (int i3 = 0; i3 < i2; i3++) {
            createLedger.asyncAddEntry(("" + i3).getBytes(), new AsyncCallback.AddCallback() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.1
                public void addComplete(int i4, LedgerHandle ledgerHandle, long j, Object obj) {
                    if (0 != i4) {
                        countDownLatch3.countDown();
                    } else if (atomicInteger.decrementAndGet() == 0) {
                        countDownLatch3.countDown();
                    }
                }
            }, (Object) null);
        }
        countDownLatch.countDown();
        countDownLatch2.countDown();
        countDownLatch3.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, atomicInteger.get());
        LOG.info("Added {} entries to ledger {}.", Integer.valueOf(i2), Long.valueOf(createLedger.getId()));
        long length = createLedger.getLength();
        LedgerHandle openLedgerNoRecovery = bookKeeper.openLedgerNoRecovery(createLedger.getId(), this.digestType, "".getBytes());
        Assert.assertEquals(-1L, openLedgerNoRecovery.getLastAddPushed());
        Assert.assertEquals(-1L, openLedgerNoRecovery.getLastAddConfirmed());
        Assert.assertEquals(0L, openLedgerNoRecovery.getLength());
        LOG.info("OpenLedgerNoRecovery {}.", Long.valueOf(createLedger.getId()));
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        testLedgerManager.setLatch(countDownLatch4);
        final CountDownLatch countDownLatch5 = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        openLedgerNoRecovery.recover(new BookkeeperInternalCallbacks.GenericCallback<Void>() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.2
            public void operationComplete(int i4, Void r7) {
                ParallelLedgerRecoveryTest.LOG.info("Recovering ledger {} completed : {}.", Long.valueOf(createLedger.getId()), Integer.valueOf(i4));
                atomicBoolean.set(0 == i4);
                countDownLatch5.countDown();
            }
        });
        testLedgerManager.setLatch(null);
        if (z2) {
            if (z3) {
                LOG.info("OpenLedger {} to close.", Long.valueOf(createLedger.getId()));
                bookKeeper.openLedger(createLedger.getId(), this.digestType, "".getBytes()).close();
            } else {
                LOG.info("OpenLedgerNoRecovery {} again.", Long.valueOf(createLedger.getId()));
                LedgerHandle openLedgerNoRecovery2 = bookKeeper.openLedgerNoRecovery(createLedger.getId(), this.digestType, "".getBytes());
                Assert.assertEquals(-1L, openLedgerNoRecovery2.getLastAddPushed());
                Assert.assertEquals(-1L, openLedgerNoRecovery2.getLastAddConfirmed());
                openLedgerNoRecovery2.getLedgerMetadata().markLedgerInRecovery();
                final CountDownLatch countDownLatch6 = new CountDownLatch(1);
                final AtomicInteger atomicInteger2 = new AtomicInteger(74565);
                openLedgerNoRecovery2.writeLedgerConfig(new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.3
                    public void operationComplete(int i4, LedgerMetadata ledgerMetadata) {
                        atomicInteger2.set(i4);
                        countDownLatch6.countDown();
                    }
                });
                countDownLatch6.await();
                Assert.assertEquals(0L, atomicInteger2.get());
                openLedgerNoRecovery2.close();
                LOG.info("Updated ledger manager {}.", openLedgerNoRecovery2.getLedgerMetadata());
            }
        }
        countDownLatch4.countDown();
        LOG.info("Resume metadata update.");
        countDownLatch5.await(20L, TimeUnit.SECONDS);
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertEquals(i2 - 1, openLedgerNoRecovery.getLastAddPushed());
        Assert.assertEquals(i2 - 1, openLedgerNoRecovery.getLastAddConfirmed());
        Assert.assertEquals(length, openLedgerNoRecovery.getLength());
        Assert.assertTrue(openLedgerNoRecovery.getLedgerMetadata().isClosed());
        Enumeration readEntries = openLedgerNoRecovery.readEntries(0L, i2 - 1);
        int i4 = 0;
        while (readEntries.hasMoreElements()) {
            Assert.assertEquals(i4, ((LedgerEntry) readEntries.nextElement()).getEntryId());
            Assert.assertEquals(i4, Integer.parseInt(new String(r0.getEntry())));
            i4++;
        }
        Assert.assertEquals(i2, i4);
        openLedgerNoRecovery.close();
        bookKeeper.close();
    }

    @Test
    public void testRecoveryOnEntryGap() throws Exception {
        byte[] bytes = "recovery-on-entry-gap".getBytes(Charsets.UTF_8);
        LedgerHandle createLedger = this.bkc.createLedger(1, 1, 1, BookKeeper.DigestType.CRC32, bytes);
        for (int i = 0; i < 10; i++) {
            createLedger.addEntry(("recovery-on-entry-gap-" + i).getBytes(Charsets.UTF_8));
        }
        byte[] bytes2 = "recovery-on-entry-gap-gap".getBytes(Charsets.UTF_8);
        ByteBufList computeDigestAndPackageForSending = createLedger.macManager.computeDigestAndPackageForSending(14L, 8L, createLedger.getLength() + 100, Unpooled.wrappedBuffer(bytes2, 0, bytes2.length));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        LOG.info("Add entry {} with lac = {}", 14L, 8L);
        createLedger.bk.getBookieClient().addEntry((BookieSocketAddress) createLedger.getLedgerMetadata().currentEnsemble.get(0), createLedger.getId(), createLedger.ledgerKey, 14L, computeDigestAndPackageForSending, new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.4
            public void writeComplete(int i2, long j, long j2, BookieSocketAddress bookieSocketAddress, Object obj) {
                atomicBoolean.set(0 == i2);
                countDownLatch.countDown();
            }
        }, 0, 0, false, WriteFlag.NONE);
        countDownLatch.await();
        Assert.assertTrue("add entry 14 should succeed", atomicBoolean.get());
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.addConfiguration(this.baseClientConf);
        clientConfiguration.setEnableParallelRecoveryRead(true);
        clientConfiguration.setRecoveryReadBatchSize(10);
        final LedgerHandle openLedgerNoRecovery = new BookKeeper(clientConfiguration).openLedgerNoRecovery(createLedger.getId(), BookKeeper.DigestType.CRC32, bytes);
        Assert.assertEquals("wrong lac found", 8L, openLedgerNoRecovery.getLastAddConfirmed());
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final AtomicLong atomicLong = new AtomicLong(-1L);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        openLedgerNoRecovery.recover(new BookkeeperInternalCallbacks.GenericCallback<Void>() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.5
            public void operationComplete(int i2, Void r6) {
                if (0 == i2) {
                    atomicLong.set(openLedgerNoRecovery.getLastAddConfirmed());
                    atomicBoolean2.set(openLedgerNoRecovery.getLedgerMetadata().isClosed());
                    atomicInteger.incrementAndGet();
                } else {
                    atomicInteger2.incrementAndGet();
                }
                countDownLatch2.countDown();
            }
        });
        countDownLatch2.await();
        Assert.assertEquals("wrong lac found", 9L, atomicLong.get());
        Assert.assertTrue("metadata isn't closed after recovery", atomicBoolean2.get());
        Thread.sleep(5000L);
        Assert.assertEquals("recovery callback should be triggered only once", 1L, atomicInteger.get());
        Assert.assertEquals("recovery callback should be triggered only once", 0L, atomicInteger2.get());
    }

    @Test
    public void testRecoveryWhenClosingLedgerHandle() throws Exception {
        byte[] bytes = "recovery-when-closing-ledger-handle".getBytes(Charsets.UTF_8);
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.addConfiguration(this.baseClientConf);
        clientConfiguration.setEnableParallelRecoveryRead(true);
        clientConfiguration.setRecoveryReadBatchSize(1);
        clientConfiguration.setAddEntryTimeout(9999999);
        clientConfiguration.setReadEntryTimeout(9999999);
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        LedgerHandle createLedger = bookKeeper.createLedger(1, 1, 1, this.digestType, bytes);
        BookKeeper bookKeeper2 = new BookKeeper(clientConfiguration);
        final LedgerHandle openLedgerNoRecovery = bookKeeper2.openLedgerNoRecovery(createLedger.getId(), this.digestType, bytes);
        TestLedgerManager testLedgerManager = (TestLedgerManager) bookKeeper2.getUnderlyingLedgerManager();
        BookKeeper bookKeeper3 = new BookKeeper(clientConfiguration);
        LedgerHandle openLedgerNoRecovery2 = bookKeeper3.openLedgerNoRecovery(createLedger.getId(), this.digestType, bytes);
        LOG.info("Create ledger {}", Long.valueOf(createLedger.getId()));
        ServerConfiguration killBookie = killBookie((BookieSocketAddress) createLedger.getLedgerMetadata().currentEnsemble.get(0));
        killBookie.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
        DelayResponseBookie delayResponseBookie = new DelayResponseBookie(killBookie);
        this.bs.add(startBookie(killBookie, delayResponseBookie));
        this.bsConfs.add(killBookie);
        createLedger.addEntry("entry-0".getBytes(Charsets.UTF_8));
        createLedger.addEntry("entry-1".getBytes(Charsets.UTF_8));
        Assert.assertEquals(0L, openLedgerNoRecovery2.readLastConfirmed());
        Assert.assertEquals(0L, openLedgerNoRecovery.readLastConfirmed());
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        delayResponseBookie.delayAdd(true);
        for (int i = 2; i < 5; i++) {
            createLedger.asyncAddEntry(("entry-" + i).getBytes(Charsets.UTF_8), new AsyncCallback.AddCallback() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.6
                public void addComplete(int i2, LedgerHandle ledgerHandle, long j, Object obj) {
                    if (0 != i2) {
                        atomicInteger.incrementAndGet();
                    }
                    countDownLatch.countDown();
                }
            }, (Object) null);
        }
        while (delayResponseBookie.delayQueue.size() < 3) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(1L, openLedgerNoRecovery2.readLastConfirmed());
        Assert.assertEquals(1L, openLedgerNoRecovery.readLastConfirmed());
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        delayResponseBookie.delayAdd(false);
        delayResponseBookie.delayRead(true, 3L, countDownLatch2);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        testLedgerManager.setLatch(countDownLatch3);
        final CountDownLatch countDownLatch4 = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        openLedgerNoRecovery.recover(new BookkeeperInternalCallbacks.GenericCallback<Void>() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.7
            public void operationComplete(int i2, Void r7) {
                ParallelLedgerRecoveryTest.LOG.info("Recovering ledger {} completed : {}", Long.valueOf(openLedgerNoRecovery.getId()), Integer.valueOf(i2));
                atomicBoolean.set(0 == i2);
                countDownLatch4.countDown();
            }
        });
        Thread.sleep(2000L);
        countDownLatch2.countDown();
        Assert.assertEquals(1L, openLedgerNoRecovery2.readLastConfirmed());
        createLedger.close();
        Assert.assertEquals(1L, createLedger.getLastAddConfirmed());
        countDownLatch3.countDown();
        countDownLatch4.await();
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertEquals(1L, openLedgerNoRecovery.getLastAddConfirmed());
        final AtomicLong atomicLong = new AtomicLong(-1234L);
        final AtomicInteger atomicInteger2 = new AtomicInteger(-1234);
        final CountDownLatch countDownLatch5 = new CountDownLatch(1);
        new ReadLastConfirmedOp(openLedgerNoRecovery2, new ReadLastConfirmedOp.LastConfirmedDataCallback() { // from class: org.apache.bookkeeper.client.ParallelLedgerRecoveryTest.8
            public void readLastConfirmedDataComplete(int i2, DigestManager.RecoveryData recoveryData) {
                atomicInteger2.set(i2);
                atomicLong.set(recoveryData.getLastAddConfirmed());
                countDownLatch5.countDown();
            }
        }).initiate();
        countDownLatch5.await();
        Assert.assertEquals(0L, atomicInteger2.get());
        Assert.assertEquals(1L, atomicLong.get());
        bookKeeper.close();
        bookKeeper2.close();
        bookKeeper3.close();
    }
}
