package org.apache.bookkeeper.bookie;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.EntryMemTable;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.class */
class EntryMemTableWithParallelFlusher extends EntryMemTable {
    private static final Logger log = LoggerFactory.getLogger(EntryMemTableWithParallelFlusher.class);
    final OrderedExecutor flushExecutor;

    public EntryMemTableWithParallelFlusher(ServerConfiguration serverConfiguration, CheckpointSource checkpointSource, StatsLogger statsLogger) {
        super(serverConfiguration, checkpointSource, statsLogger);
        this.flushExecutor = OrderedExecutor.newBuilder().numThreads(serverConfiguration.getNumOfMemtableFlushThreads()).name("MemtableFlushThreads").build();
    }

    @Override // org.apache.bookkeeper.bookie.EntryMemTable
    long flushSnapshot(final SkipListFlusher skipListFlusher, CheckpointSource.Checkpoint checkpoint) throws IOException {
        final AtomicLong atomicLong = new AtomicLong();
        if (this.snapshot.compareTo(checkpoint) < 0) {
            synchronized (this) {
                EntryMemTable.EntrySkipList entrySkipList = this.snapshot;
                final Phaser phaser = new Phaser(1);
                final AtomicReference atomicReference = new AtomicReference();
                if (entrySkipList.compareTo(checkpoint) < 0) {
                    Map.Entry<EntryKey, EntryKeyValue> firstEntry = entrySkipList.firstEntry();
                    while (firstEntry != null) {
                        EntryKeyValue value = firstEntry.getValue();
                        long ledgerId = value.getLedgerId();
                        EntryKey entryKey = new EntryKey(ledgerId, 9223372036854775806L);
                        final ConcurrentNavigableMap<EntryKey, EntryKeyValue> subMap = entrySkipList.subMap(value, (EntryKeyValue) entryKey);
                        phaser.register();
                        this.flushExecutor.executeOrdered(ledgerId, new SafeRunnable() { // from class: org.apache.bookkeeper.bookie.EntryMemTableWithParallelFlusher.1
                            @Override // org.apache.bookkeeper.common.util.SafeRunnable
                            public void safeRun() {
                                try {
                                    boolean z = false;
                                    Iterator it = subMap.keySet().iterator();
                                    while (it.hasNext()) {
                                        EntryKeyValue entryKeyValue = (EntryKeyValue) ((EntryKey) it.next());
                                        atomicLong.addAndGet(entryKeyValue.getLength());
                                        long ledgerId2 = entryKeyValue.getLedgerId();
                                        if (!z) {
                                            try {
                                                skipListFlusher.process(ledgerId2, entryKeyValue.getEntryId(), entryKeyValue.getValueAsByteBuffer());
                                            } catch (Bookie.NoLedgerException e) {
                                                z = true;
                                            }
                                        }
                                    }
                                    phaser.arriveAndDeregister();
                                } catch (Exception e2) {
                                    EntryMemTableWithParallelFlusher.log.error("Got Exception while trying to flush process entryies: ", e2);
                                    atomicReference.set(e2);
                                    phaser.forceTermination();
                                }
                            }
                        });
                        firstEntry = entrySkipList.ceilingEntry(entryKey);
                    }
                    try {
                        if (phaser.arriveAndAwaitAdvance() < 0) {
                            log.error("Phaser is terminated while awaiting flushExecutor to complete the entry flushes", (Throwable) atomicReference.get());
                            throw new IOException("Failed to complete the flushSnapshotByParallelizing", (Throwable) atomicReference.get());
                        }
                        this.flushBytesCounter.add(atomicLong.get());
                        clearSnapshot(entrySkipList);
                    } catch (IllegalStateException e) {
                        log.error("Got IllegalStateException while awaiting on Phaser", e);
                        throw new IOException("Got IllegalStateException while awaiting on Phaser", e);
                    }
                }
            }
        }
        this.skipListSemaphore.release(atomicLong.intValue());
        return atomicLong.longValue();
    }

    @Override // org.apache.bookkeeper.bookie.EntryMemTable, java.lang.AutoCloseable
    public void close() throws Exception {
        this.flushExecutor.shutdown();
    }
}
