package org.apache.bookkeeper.client;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.3.1.jar:org/apache/bookkeeper/client/PendingReadOp.class */
public class PendingReadOp implements BookkeeperInternalCallbacks.ReadEntryCallback, SafeRunnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PendingReadOp.class);
    protected final List<LedgerEntryRequest> seq;
    LedgerHandle lh;
    final ClientContext clientCtx;
    long numPendingEntries;
    final long startEntryId;
    final long endEntryId;
    long requestTimeNanos;
    final boolean isRecoveryRead;
    boolean allowFailFast;
    private ScheduledFuture<?> speculativeTask = null;
    private final Set<BookieId> sentToHosts = new HashSet();
    boolean parallelRead = false;
    final AtomicBoolean complete = new AtomicBoolean(false);
    private final CompletableFuture<LedgerEntries> future = new CompletableFuture<>();
    final int requiredBookiesMissingEntryForRecovery = (getLedgerMetadata().getWriteQuorumSize() - getLedgerMetadata().getAckQuorumSize()) + 1;
    private final Set<BookieId> heardFromHosts = new HashSet();
    private final BitSet heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.3.1.jar:org/apache/bookkeeper/client/PendingReadOp$LedgerEntryRequest.class */
    public abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable {
        final AtomicBoolean complete = new AtomicBoolean(false);
        int rc = 0;
        int firstError = 0;
        int numBookiesMissingEntry = 0;
        final List<BookieId> ensemble;
        final DistributionSchedule.WriteSet writeSet;
        final LedgerEntryImpl entryImpl;
        final long eId;

        LedgerEntryRequest(List<BookieId> list, long j, long j2) {
            this.entryImpl = LedgerEntryImpl.create(j, j2);
            this.ensemble = list;
            this.eId = j2;
            if (PendingReadOp.this.clientCtx.getConf().enableReorderReadSequence) {
                this.writeSet = PendingReadOp.this.clientCtx.getPlacementPolicy().reorderReadSequence(list, PendingReadOp.this.lh.getBookiesHealthInfo(), PendingReadOp.this.lh.getWriteSetForReadOperation(j2));
            } else {
                this.writeSet = PendingReadOp.this.lh.getWriteSetForReadOperation(j2);
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.entryImpl.close();
        }

        abstract void read();

        boolean complete(int i, BookieId bookieId, ByteBuf byteBuf) {
            if (isComplete()) {
                return false;
            }
            try {
                ByteBuf verifyDigestAndReturnData = PendingReadOp.this.lh.macManager.verifyDigestAndReturnData(this.eId, byteBuf);
                if (this.complete.getAndSet(true)) {
                    return false;
                }
                this.rc = 0;
                this.entryImpl.setLength(byteBuf.getLong(24));
                this.entryImpl.setEntryBuf(verifyDigestAndReturnData);
                this.writeSet.recycle();
                return true;
            } catch (BKException.BKDigestMatchException e) {
                PendingReadOp.this.clientCtx.getClientStats().getReadOpDmCounter().inc();
                logErrorAndReattemptRead(i, bookieId, "Mac mismatch", -5);
                return false;
            }
        }

        boolean fail(int i) {
            if (!this.complete.compareAndSet(false, true)) {
                return false;
            }
            this.rc = i;
            PendingReadOp.this.submitCallback(i);
            this.writeSet.recycle();
            return true;
        }

        synchronized void logErrorAndReattemptRead(int i, BookieId bookieId, String str, int i2) {
            if (0 == this.firstError || -13 == this.firstError || -7 == this.firstError) {
                this.firstError = i2;
            } else if (-8 == this.firstError && -13 != i2 && -7 != i2) {
                this.firstError = i2;
            }
            if (-13 == i2 || -7 == i2) {
                this.numBookiesMissingEntry++;
                if (PendingReadOp.LOG.isDebugEnabled()) {
                    PendingReadOp.LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}", Long.valueOf(PendingReadOp.this.lh.ledgerId), Long.valueOf(this.eId), bookieId);
                }
            } else if (PendingReadOp.LOG.isInfoEnabled()) {
                PendingReadOp.LOG.info("{} while reading L{} E{} from bookie: {}", str, Long.valueOf(PendingReadOp.this.lh.ledgerId), Long.valueOf(this.eId), bookieId);
            }
            PendingReadOp.this.lh.recordReadErrorOnBookie(i);
        }

        abstract BookieId maybeSendSpeculativeRead(BitSet bitSet);

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isComplete() {
            return this.complete.get();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getRc() {
            return this.rc;
        }

        public String toString() {
            return String.format("L%d-E%d", Long.valueOf(PendingReadOp.this.lh.getId()), Long.valueOf(this.eId));
        }

        @Override // org.apache.bookkeeper.client.SpeculativeRequestExecutor
        public ListenableFuture<Boolean> issueSpeculativeRequest() {
            return PendingReadOp.this.clientCtx.getMainWorkerPool().submitOrdered(PendingReadOp.this.lh.getId(), new Callable<Boolean>() { // from class: org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() throws Exception {
                    if (LedgerEntryRequest.this.isComplete() || null == LedgerEntryRequest.this.maybeSendSpeculativeRead(PendingReadOp.this.heardFromHostsBitSet)) {
                        return false;
                    }
                    if (PendingReadOp.LOG.isDebugEnabled()) {
                        PendingReadOp.LOG.debug("Send speculative read for {}. Hosts sent are {},  Hosts heard are {}, ensemble is {}.", this, PendingReadOp.this.sentToHosts, PendingReadOp.this.heardFromHostsBitSet, LedgerEntryRequest.this.ensemble);
                    }
                    return true;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.3.1.jar:org/apache/bookkeeper/client/PendingReadOp$ParallelReadRequest.class */
    public class ParallelReadRequest extends LedgerEntryRequest {
        int numPendings;

        ParallelReadRequest(List<BookieId> list, long j, long j2) {
            super(list, j, j2);
            this.numPendings = this.writeSet.size();
        }

        @Override // org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest
        void read() {
            for (int i = 0; i < this.writeSet.size(); i++) {
                try {
                    PendingReadOp.this.sendReadTo(this.writeSet.get(i), this.ensemble.get(this.writeSet.get(i)), this);
                } catch (InterruptedException e) {
                    PendingReadOp.LOG.error("Interrupted reading entry {} : ", this, e);
                    Thread.currentThread().interrupt();
                    fail(-15);
                    return;
                }
            }
        }

        @Override // org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest
        synchronized void logErrorAndReattemptRead(int i, BookieId bookieId, String str, int i2) {
            super.logErrorAndReattemptRead(i, bookieId, str, i2);
            this.numPendings--;
            if (PendingReadOp.this.isRecoveryRead && this.numBookiesMissingEntry >= PendingReadOp.this.requiredBookiesMissingEntryForRecovery) {
                fail(-13);
            } else if (this.numPendings == 0) {
                fail(this.firstError);
            }
        }

        @Override // org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest
        BookieId maybeSendSpeculativeRead(BitSet bitSet) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.3.1.jar:org/apache/bookkeeper/client/PendingReadOp$ReadContext.class */
    public static class ReadContext implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx {
        final int bookieIndex;
        final BookieId to;
        final LedgerEntryRequest entry;
        long lac = -1;

        ReadContext(int i, BookieId bookieId, LedgerEntryRequest ledgerEntryRequest) {
            this.bookieIndex = i;
            this.to = bookieId;
            this.entry = ledgerEntryRequest;
        }

        @Override // org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx
        public void setLastAddConfirmed(long j) {
            this.lac = j;
        }

        @Override // org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx
        public long getLastAddConfirmed() {
            return this.lac;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.3.1.jar:org/apache/bookkeeper/client/PendingReadOp$SequenceReadRequest.class */
    public class SequenceReadRequest extends LedgerEntryRequest {
        static final int NOT_FOUND = -1;
        int nextReplicaIndexToReadFrom;
        final BitSet sentReplicas;
        final BitSet erroredReplicas;

        SequenceReadRequest(List<BookieId> list, long j, long j2) {
            super(list, j, j2);
            this.nextReplicaIndexToReadFrom = 0;
            this.sentReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
            this.erroredReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
        }

        private synchronized int getNextReplicaIndexToReadFrom() {
            return this.nextReplicaIndexToReadFrom;
        }

        private BitSet getSentToBitSet() {
            BitSet bitSet = new BitSet(this.ensemble.size());
            for (int i = 0; i < this.sentReplicas.length(); i++) {
                if (this.sentReplicas.get(i)) {
                    bitSet.set(this.writeSet.get(i));
                }
            }
            return bitSet;
        }

        private boolean readsOutstanding() {
            return this.sentReplicas.cardinality() - this.erroredReplicas.cardinality() > 0;
        }

        @Override // org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest
        synchronized BookieId maybeSendSpeculativeRead(BitSet bitSet) {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                return null;
            }
            BitSet sentToBitSet = getSentToBitSet();
            sentToBitSet.and(bitSet);
            if (sentToBitSet.cardinality() != 0) {
                return null;
            }
            PendingReadOp.this.clientCtx.getClientStats().getSpeculativeReadCounter().inc();
            return sendNextRead();
        }

        @Override // org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest
        void read() {
            sendNextRead();
        }

        synchronized BookieId sendNextRead() {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                fail(this.firstError);
                return null;
            }
            int i = this.nextReplicaIndexToReadFrom;
            int i2 = this.writeSet.get(this.nextReplicaIndexToReadFrom);
            this.nextReplicaIndexToReadFrom++;
            try {
                BookieId bookieId = this.ensemble.get(i2);
                PendingReadOp.this.sendReadTo(i2, bookieId, this);
                PendingReadOp.this.sentToHosts.add(bookieId);
                this.sentReplicas.set(i);
                return bookieId;
            } catch (InterruptedException e) {
                PendingReadOp.LOG.error("Interrupted reading entry " + this, (Throwable) e);
                Thread.currentThread().interrupt();
                fail(-15);
                return null;
            }
        }

        @Override // org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest
        synchronized void logErrorAndReattemptRead(int i, BookieId bookieId, String str, int i2) {
            super.logErrorAndReattemptRead(i, bookieId, str, i2);
            int indexOf = this.writeSet.indexOf(i);
            if (indexOf == -1) {
                PendingReadOp.LOG.error("Received error from a host which is not in the ensemble {} {}.", bookieId, this.ensemble);
                return;
            }
            this.erroredReplicas.set(indexOf);
            if (PendingReadOp.this.isRecoveryRead && this.numBookiesMissingEntry >= PendingReadOp.this.requiredBookiesMissingEntryForRecovery) {
                fail(-13);
            } else {
                if (readsOutstanding()) {
                    return;
                }
                sendNextRead();
            }
        }

        @Override // org.apache.bookkeeper.client.PendingReadOp.LedgerEntryRequest
        boolean complete(int i, BookieId bookieId, ByteBuf byteBuf) {
            boolean complete = super.complete(i, bookieId, byteBuf);
            if (complete) {
                int nextReplicaIndexToReadFrom = getNextReplicaIndexToReadFrom();
                for (int i2 = 0; i2 < nextReplicaIndexToReadFrom - 1; i2++) {
                    PendingReadOp.this.clientCtx.getPlacementPolicy().registerSlowBookie(this.ensemble.get(this.writeSet.get(i2)), this.eId);
                }
            }
            return complete;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PendingReadOp(LedgerHandle ledgerHandle, ClientContext clientContext, long j, long j2, boolean z) {
        this.allowFailFast = false;
        this.seq = new ArrayList((int) ((j2 + 1) - j));
        this.lh = ledgerHandle;
        this.clientCtx = clientContext;
        this.startEntryId = j;
        this.endEntryId = j2;
        this.isRecoveryRead = z;
        this.allowFailFast = false;
        this.numPendingEntries = (j2 - j) + 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<LedgerEntries> future() {
        return this.future;
    }

    protected LedgerMetadata getLedgerMetadata() {
        return this.lh.getLedgerMetadata();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cancelSpeculativeTask(boolean z) {
        if (this.speculativeTask != null) {
            this.speculativeTask.cancel(z);
            this.speculativeTask = null;
        }
    }

    public ScheduledFuture<?> getSpeculativeTask() {
        return this.speculativeTask;
    }

    PendingReadOp parallelRead(boolean z) {
        this.parallelRead = z;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void allowFailFastOnUnwritableChannel() {
        this.allowFailFast = true;
    }

    public void submit() {
        this.clientCtx.getMainWorkerPool().executeOrdered(this.lh.ledgerId, this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initiate() {
        long j = this.startEntryId;
        long j2 = this.startEntryId;
        this.requestTimeNanos = MathUtils.nowInNano();
        List<BookieId> list = null;
        do {
            if (j2 == j) {
                list = getLedgerMetadata().getEnsembleAt(j2);
                j = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), j2);
            }
            this.seq.add(this.parallelRead ? new ParallelReadRequest(list, this.lh.ledgerId, j2) : new SequenceReadRequest(list, this.lh.ledgerId, j2));
            j2++;
        } while (j2 <= this.endEntryId);
        for (LedgerEntryRequest ledgerEntryRequest : this.seq) {
            ledgerEntryRequest.read();
            if (!this.parallelRead && this.clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) {
                this.speculativeTask = this.clientCtx.getConf().readSpeculativeRequestPolicy.get().initiateSpeculativeRequest(this.clientCtx.getScheduler(), ledgerEntryRequest);
            }
        }
    }

    @Override // org.apache.bookkeeper.common.util.SafeRunnable
    public void safeRun() {
        initiate();
    }

    void sendReadTo(int i, BookieId bookieId, LedgerEntryRequest ledgerEntryRequest) throws InterruptedException {
        if (this.lh.throttler != null) {
            this.lh.throttler.acquire();
        }
        if (this.isRecoveryRead) {
            this.clientCtx.getBookieClient().readEntry(bookieId, this.lh.ledgerId, ledgerEntryRequest.eId, this, new ReadContext(i, bookieId, ledgerEntryRequest), 5, this.lh.ledgerKey);
        } else {
            this.clientCtx.getBookieClient().readEntry(bookieId, this.lh.ledgerId, ledgerEntryRequest.eId, this, new ReadContext(i, bookieId, ledgerEntryRequest), 0);
        }
    }

    @Override // org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
    public void readEntryComplete(int i, long j, long j2, ByteBuf byteBuf, Object obj) {
        ReadContext readContext = (ReadContext) obj;
        LedgerEntryRequest ledgerEntryRequest = readContext.entry;
        if (i != 0) {
            ledgerEntryRequest.logErrorAndReattemptRead(readContext.bookieIndex, readContext.to, "Error: " + BKException.getMessage(i), i);
            return;
        }
        this.heardFromHosts.add(readContext.to);
        this.heardFromHostsBitSet.set(readContext.bookieIndex, true);
        byteBuf.retain();
        if (ledgerEntryRequest.complete(readContext.bookieIndex, readContext.to, byteBuf)) {
            if (!this.isRecoveryRead) {
                this.lh.updateLastConfirmed(readContext.getLastAddConfirmed(), 0L);
            }
            submitCallback(0);
        } else {
            byteBuf.release();
        }
        if (this.numPendingEntries < 0) {
            LOG.error("Read too many values for ledger {} : [{}, {}].", Long.valueOf(j), Long.valueOf(this.startEntryId), Long.valueOf(this.endEntryId));
        }
    }

    protected void submitCallback(int i) {
        if (0 == i) {
            this.numPendingEntries--;
            if (this.numPendingEntries != 0) {
                return;
            }
        }
        if (this.complete.compareAndSet(false, true)) {
            cancelSpeculativeTask(true);
            long elapsedNanos = MathUtils.elapsedNanos(this.requestTimeNanos);
            if (i == 0) {
                this.clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
                this.future.complete(LedgerEntriesImpl.create(Lists.transform(this.seq, ledgerEntryRequest -> {
                    return ledgerEntryRequest.entryImpl;
                })));
                return;
            }
            long j = -1;
            Integer num = null;
            Iterator<LedgerEntryRequest> it = this.seq.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                LedgerEntryRequest next = it.next();
                if (!next.isComplete()) {
                    j = next.eId;
                    num = Integer.valueOf(next.rc);
                    break;
                }
            }
            LOG.error("Read of ledger entry failed: L{} E{}-E{}, Sent to {}, Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", Long.valueOf(this.lh.getId()), Long.valueOf(this.startEntryId), Long.valueOf(this.endEntryId), this.sentToHosts, this.heardFromHosts, this.heardFromHostsBitSet, BKException.getMessage(i), Long.valueOf(j), num);
            this.clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
            this.seq.forEach((v0) -> {
                v0.close();
            });
            this.future.completeExceptionally(BKException.create(i));
        }
    }
}
