package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.google.common.collect.Queues;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.9.0-rc-202106242205.jar:org/apache/pulsar/client/impl/ConsumerImpl.class */
public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection {
    private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
    final long consumerId;
    private volatile int availablePermits;
    protected volatile MessageId lastDequeuedMessageId;
    private volatile MessageId lastMessageIdInBroker;
    private final long subscribeTimeout;
    private final int partitionIndex;
    private final boolean hasParentConsumer;
    private final int receiverQueueRefillThreshold;
    private final UnAckedMessageTracker unAckedMessageTracker;
    private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
    private final NegativeAcksTracker negativeAcksTracker;
    protected final ConsumerStatsRecorder stats;
    private final int priorityLevel;
    private final SubscriptionMode subscriptionMode;
    private volatile BatchMessageIdImpl startMessageId;
    private volatile BatchMessageIdImpl seekMessageId;
    private final AtomicBoolean duringSeek;
    private final BatchMessageIdImpl initialStartMessageId;
    private final long startMessageRollbackDurationInSec;
    private volatile boolean hasReachedEndOfTopic;
    private final MessageCrypto msgCrypto;
    private final Map<String, String> metadata;
    private final boolean readCompacted;
    private final boolean resetIncludeHead;
    private final SubscriptionInitialPosition subscriptionInitialPosition;
    private final ConnectionHandler connectionHandler;
    private final TopicName topicName;
    private final String topicNameWithoutPartition;
    private final Map<MessageIdImpl, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
    private final DeadLetterPolicy deadLetterPolicy;
    private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
    private volatile Producer<T> retryLetterProducer;
    private final ReadWriteLock createProducerLock;
    protected volatile boolean paused;
    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap;
    private int pendingChunkedMessageCount;
    protected long expireTimeOfIncompleteChunkedMessageMillis;
    private boolean expireChunkMessageTaskScheduled;
    private final int maxPendingChunkedMessage;
    private final boolean autoAckOldestChunkedMessageOnQueueFull;
    private final BlockingQueue<String> pendingChunkedMessageUuidQueue;
    private final boolean createTopicIfDoesNotExist;
    private final boolean poolMessages;
    private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration;
    private final ExecutorService internalPinnedExecutor;
    private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits");
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerImpl.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.9.0-rc-202106242205.jar:org/apache/pulsar/client/impl/ConsumerImpl$ChunkedMessageCtx.class */
    public static class ChunkedMessageCtx {
        protected int totalChunks;
        protected ByteBuf chunkedMsgBuffer;
        protected int lastChunkedMessageId;
        protected MessageIdImpl[] chunkedMessageIds;
        protected long receivedTime;
        private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle;
        private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() { // from class: org.apache.pulsar.client.impl.ConsumerImpl.ChunkedMessageCtx.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            public ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
                return new ChunkedMessageCtx(handle);
            }
        };

        /* JADX INFO: Access modifiers changed from: package-private */
        public static ChunkedMessageCtx get(int i, ByteBuf byteBuf) {
            ChunkedMessageCtx chunkedMessageCtx = RECYCLER.get();
            chunkedMessageCtx.totalChunks = i;
            chunkedMessageCtx.chunkedMsgBuffer = byteBuf;
            chunkedMessageCtx.chunkedMessageIds = new MessageIdImpl[i];
            chunkedMessageCtx.receivedTime = System.currentTimeMillis();
            return chunkedMessageCtx;
        }

        private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx> handle) {
            this.totalChunks = -1;
            this.lastChunkedMessageId = -1;
            this.receivedTime = 0L;
            this.recyclerHandle = handle;
        }

        public void recycle() {
            this.totalChunks = -1;
            this.chunkedMsgBuffer = null;
            this.lastChunkedMessageId = -1;
            this.recyclerHandle.recycle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.9.0-rc-202106242205.jar:org/apache/pulsar/client/impl/ConsumerImpl$GetLastMessageIdResponse.class */
    public static final class GetLastMessageIdResponse {
        final MessageId lastMessageId;
        final MessageId markDeletePosition;

        GetLastMessageIdResponse(MessageId messageId, MessageId messageId2) {
            this.lastMessageId = messageId;
            this.markDeletePosition = messageId2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, int i, boolean z, CompletableFuture<Consumer<T>> completableFuture, MessageId messageId, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors, boolean z2) {
        return newConsumerImpl(pulsarClientImpl, str, consumerConfigurationData, executorProvider, i, z, completableFuture, messageId, schema, consumerInterceptors, z2, 0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, int i, boolean z, CompletableFuture<Consumer<T>> completableFuture, MessageId messageId, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors, boolean z2, long j) {
        return consumerConfigurationData.getReceiverQueueSize() == 0 ? new ZeroQueueConsumerImpl(pulsarClientImpl, str, consumerConfigurationData, executorProvider, i, z, completableFuture, messageId, schema, consumerInterceptors, z2) : new ConsumerImpl<>(pulsarClientImpl, str, consumerConfigurationData, executorProvider, i, z, completableFuture, messageId, j, schema, consumerInterceptors, z2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, int i, boolean z, CompletableFuture<Consumer<T>> completableFuture, MessageId messageId, long j, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors, boolean z2) {
        super(pulsarClientImpl, str, consumerConfigurationData, consumerConfigurationData.getReceiverQueueSize(), executorProvider, completableFuture, schema, consumerInterceptors);
        MessageCryptoBc messageCryptoBc;
        this.availablePermits = 0;
        this.lastDequeuedMessageId = MessageId.earliest;
        this.lastMessageIdInBroker = MessageId.earliest;
        this.createProducerLock = new ReentrantReadWriteLock();
        this.chunkedMessagesMap = new ConcurrentOpenHashMap<>();
        this.pendingChunkedMessageCount = 0;
        this.expireTimeOfIncompleteChunkedMessageMillis = 0L;
        this.expireChunkMessageTaskScheduled = false;
        this.clientCnxUsedForConsumerRegistration = new AtomicReference<>();
        this.consumerId = pulsarClientImpl.newConsumerId();
        this.subscriptionMode = consumerConfigurationData.getSubscriptionMode();
        this.startMessageId = messageId != null ? new BatchMessageIdImpl((MessageIdImpl) messageId) : null;
        this.initialStartMessageId = this.startMessageId;
        this.startMessageRollbackDurationInSec = j;
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
        this.subscribeTimeout = System.currentTimeMillis() + pulsarClientImpl.getConfiguration().getOperationTimeoutMs();
        this.partitionIndex = i;
        this.hasParentConsumer = z;
        this.receiverQueueRefillThreshold = consumerConfigurationData.getReceiverQueueSize() / 2;
        this.priorityLevel = consumerConfigurationData.getPriorityLevel();
        this.readCompacted = consumerConfigurationData.isReadCompacted();
        this.subscriptionInitialPosition = consumerConfigurationData.getSubscriptionInitialPosition();
        this.negativeAcksTracker = new NegativeAcksTracker(this, consumerConfigurationData);
        this.resetIncludeHead = consumerConfigurationData.isResetIncludeHead();
        this.createTopicIfDoesNotExist = z2;
        this.maxPendingChunkedMessage = consumerConfigurationData.getMaxPendingChunkedMessage();
        this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue();
        this.expireTimeOfIncompleteChunkedMessageMillis = consumerConfigurationData.getExpireTimeOfIncompleteChunkedMessageMillis();
        this.autoAckOldestChunkedMessageOnQueueFull = consumerConfigurationData.isAutoAckOldestChunkedMessageOnQueueFull();
        this.poolMessages = consumerConfigurationData.isPoolMessages();
        this.internalPinnedExecutor = pulsarClientImpl.getInternalExecutorService();
        if (pulsarClientImpl.getConfiguration().getStatsIntervalSeconds() > 0) {
            this.stats = new ConsumerStatsRecorderImpl(pulsarClientImpl, consumerConfigurationData, this);
        } else {
            this.stats = ConsumerStatsDisabled.INSTANCE;
        }
        this.duringSeek = new AtomicBoolean(false);
        if (consumerConfigurationData.getAckTimeoutMillis() == 0) {
            this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
        } else if (consumerConfigurationData.getTickDurationMillis() > 0) {
            this.unAckedMessageTracker = new UnAckedMessageTracker(pulsarClientImpl, this, consumerConfigurationData.getAckTimeoutMillis(), Math.min(consumerConfigurationData.getTickDurationMillis(), consumerConfigurationData.getAckTimeoutMillis()));
        } else {
            this.unAckedMessageTracker = new UnAckedMessageTracker(pulsarClientImpl, this, consumerConfigurationData.getAckTimeoutMillis());
        }
        if (consumerConfigurationData.getCryptoKeyReader() == null) {
            this.msgCrypto = null;
        } else if (consumerConfigurationData.getMessageCrypto() != null) {
            this.msgCrypto = consumerConfigurationData.getMessageCrypto();
        } else {
            try {
                messageCryptoBc = new MessageCryptoBc(String.format("[%s] [%s]", str, this.subscription), false);
            } catch (Exception e) {
                log.error("MessageCryptoBc may not included in the jar. e:", (Throwable) e);
                messageCryptoBc = null;
            }
            this.msgCrypto = messageCryptoBc;
        }
        if (consumerConfigurationData.getProperties().isEmpty()) {
            this.metadata = Collections.emptyMap();
        } else {
            this.metadata = Collections.unmodifiableMap(new HashMap(consumerConfigurationData.getProperties()));
        }
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(pulsarClientImpl.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(pulsarClientImpl.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create(), this);
        this.topicName = TopicName.get(str);
        if (this.topicName.isPersistent()) {
            this.acknowledgmentsGroupingTracker = new PersistentAcknowledgmentsGroupingTracker(this, consumerConfigurationData, pulsarClientImpl.eventLoopGroup());
        } else {
            this.acknowledgmentsGroupingTracker = NonPersistentAcknowledgmentGroupingTracker.of();
        }
        if (consumerConfigurationData.getDeadLetterPolicy() != null) {
            this.possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap();
            if (StringUtils.isNotBlank(consumerConfigurationData.getDeadLetterPolicy().getDeadLetterTopic())) {
                this.deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(consumerConfigurationData.getDeadLetterPolicy().getMaxRedeliverCount()).deadLetterTopic(consumerConfigurationData.getDeadLetterPolicy().getDeadLetterTopic()).build();
            } else {
                this.deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(consumerConfigurationData.getDeadLetterPolicy().getMaxRedeliverCount()).deadLetterTopic(String.format("%s-%s-DLQ", str, this.subscription)).build();
            }
            if (StringUtils.isNotBlank(consumerConfigurationData.getDeadLetterPolicy().getRetryLetterTopic())) {
                this.deadLetterPolicy.setRetryLetterTopic(consumerConfigurationData.getDeadLetterPolicy().getRetryLetterTopic());
            } else {
                this.deadLetterPolicy.setRetryLetterTopic(String.format("%s-%s-RETRY", str, this.subscription));
            }
        } else {
            this.deadLetterPolicy = null;
            this.possibleSendToDeadLetterTopicMessages = null;
        }
        this.topicNameWithoutPartition = this.topicName.getPartitionedTopicName();
        grabCnx();
    }

    public ConnectionHandler getConnectionHandler() {
        return this.connectionHandler;
    }

    public UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> unsubscribeAsync() {
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (isConnected()) {
            setState(HandlerState.State.Closing);
            long newRequestId = this.client.newRequestId();
            cnx().sendRequestWithId(Commands.newUnsubscribe(this.consumerId, newRequestId), newRequestId).thenRun(() -> {
                closeConsumerTasks();
                deregisterFromClientCnx();
                this.client.cleanupConsumer(this);
                log.info("[{}][{}] Successfully unsubscribed from topic", this.topic, this.subscription);
                setState(HandlerState.State.Closed);
                completableFuture.complete(null);
            }).exceptionally(th -> {
                log.error("[{}][{}] Failed to unsubscribe: {}", this.topic, this.subscription, th.getCause().getMessage());
                setState(HandlerState.State.Ready);
                completableFuture.completeExceptionally(PulsarClientException.wrap(th.getCause(), String.format("Failed to unsubscribe the subscription %s of topic %s", this.topicName.toString(), this.subscription)));
                return null;
            });
        } else {
            completableFuture.completeExceptionally(new PulsarClientException(String.format("The client is not connected to the broker when unsubscribing the subscription %s of the topic %s", this.subscription, this.topicName.toString())));
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message<T> internalReceive() throws PulsarClientException {
        try {
            Message<T> take = this.incomingMessages.take();
            messageProcessed(take);
            return beforeConsume(take);
        } catch (InterruptedException e) {
            this.stats.incrementNumReceiveFailed();
            throw PulsarClientException.unwrap(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public CompletableFuture<Message<T>> internalReceiveAsync() {
        CompletableFutureCancellationHandler completableFutureCancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Message<T>> createFuture = completableFutureCancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            Message<T> poll = this.incomingMessages.poll();
            if (poll == null) {
                this.pendingReceives.add(createFuture);
                completableFutureCancellationHandler.setCancelAction(() -> {
                    this.pendingReceives.remove(createFuture);
                });
            }
            if (poll != null) {
                messageProcessed(poll);
                createFuture.complete(beforeConsume(poll));
            }
        });
        return createFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message<T> internalReceive(int i, TimeUnit timeUnit) throws PulsarClientException {
        try {
            Message<T> poll = this.incomingMessages.poll(i, timeUnit);
            if (poll == null) {
                return null;
            }
            messageProcessed(poll);
            return beforeConsume(poll);
        } catch (InterruptedException e) {
            HandlerState.State state = getState();
            if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
                return null;
            }
            this.stats.incrementNumReceiveFailed();
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Messages<T> internalBatchReceive() throws PulsarClientException {
        try {
            return internalBatchReceiveAsync().get();
        } catch (InterruptedException | ExecutionException e) {
            HandlerState.State state = getState();
            if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
                return null;
            }
            this.stats.incrementNumBatchReceiveFailed();
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
        CompletableFutureCancellationHandler completableFutureCancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Messages<T>> createFuture = completableFutureCancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            if (this.pendingBatchReceives == null) {
                this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
            }
            if (!hasEnoughMessagesForBatchReceive()) {
                ConsumerBase.OpBatchReceive<T> of = ConsumerBase.OpBatchReceive.of(createFuture);
                this.pendingBatchReceives.add(of);
                completableFutureCancellationHandler.setCancelAction(() -> {
                    this.pendingBatchReceives.remove(of);
                });
                return;
            }
            MessagesImpl<T> newMessagesImpl = getNewMessagesImpl();
            Message<T> peek = this.incomingMessages.peek();
            while (true) {
                Message<T> message = peek;
                if (message == null || !newMessagesImpl.canAdd(message)) {
                    break;
                }
                Message<T> poll = this.incomingMessages.poll();
                if (poll != null) {
                    messageProcessed(poll);
                    newMessagesImpl.add(beforeConsume(poll));
                }
                peek = this.incomingMessages.peek();
            }
            createFuture.complete(newMessagesImpl);
        });
        return createFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl) {
        Preconditions.checkArgument(messageId instanceof MessageIdImpl);
        if (getState() == HandlerState.State.Ready || getState() == HandlerState.State.Connecting) {
            return transactionImpl != null ? doTransactionAcknowledgeForResponse(messageId, ackType, null, map, new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())) : this.acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, map);
        }
        this.stats.incrementNumAcksFailed();
        PulsarClientException pulsarClientException = new PulsarClientException("Consumer not ready. State: " + getState());
        if (CommandAck.AckType.Individual.equals(ackType)) {
            onAcknowledge(messageId, pulsarClientException);
        } else if (CommandAck.AckType.Cumulative.equals(ackType)) {
            onAcknowledgeCumulative(messageId, pulsarClientException);
        }
        return FutureUtil.failedFuture(pulsarClientException);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Void> doAcknowledge(List<MessageId> list, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl) {
        return this.acknowledgmentsGroupingTracker.addListAcknowledgment(list, ackType, map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public CompletableFuture<Void> doReconsumeLater(Message<?> message, CommandAck.AckType ackType, Map<String, Long> map, long j, TimeUnit timeUnit) {
        MessageId messageId = message.getMessageId();
        if (messageId == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId"));
        }
        if (messageId instanceof TopicMessageIdImpl) {
            messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
        }
        Preconditions.checkArgument(messageId instanceof MessageIdImpl);
        if (getState() != HandlerState.State.Ready && getState() != HandlerState.State.Connecting) {
            this.stats.incrementNumAcksFailed();
            PulsarClientException pulsarClientException = new PulsarClientException("Consumer not ready. State: " + getState());
            if (CommandAck.AckType.Individual.equals(ackType)) {
                onAcknowledge(messageId, pulsarClientException);
            } else if (CommandAck.AckType.Cumulative.equals(ackType)) {
                onAcknowledgeCumulative(messageId, pulsarClientException);
            }
            return FutureUtil.failedFuture(pulsarClientException);
        }
        if (j < 0) {
            j = 0;
        }
        if (this.retryLetterProducer == null) {
            this.createProducerLock.writeLock().lock();
            try {
                try {
                    if (this.retryLetterProducer == null) {
                        this.retryLetterProducer = this.client.newProducer(this.schema).topic(this.deadLetterPolicy.getRetryLetterTopic()).enableBatching(false).blockIfQueueFull(false).create();
                    }
                } catch (Exception e) {
                    log.error("Create retry letter producer exception with topic: {}", this.deadLetterPolicy.getRetryLetterTopic(), e);
                    this.createProducerLock.writeLock().unlock();
                }
            } finally {
                this.createProducerLock.writeLock().unlock();
            }
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.retryLetterProducer != null) {
            try {
                MessageImpl<?> messageImpl = getMessageImpl(message);
                SortedMap<String, String> propertiesMap = getPropertiesMap(message, getOriginMessageIdStr(message), getOriginTopicNameStr(message));
                int i = 1;
                if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                    i = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) + 1;
                }
                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(i));
                propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(timeUnit.toMillis(j)));
                if (i <= this.deadLetterPolicy.getMaxRedeliverCount() || !StringUtils.isNotBlank(this.deadLetterPolicy.getDeadLetterTopic())) {
                    TypedMessageBuilder<T> properties = this.retryLetterProducer.newMessage().value(messageImpl.getValue()).properties(propertiesMap);
                    if (j > 0) {
                        properties.deliverAfter(j, timeUnit);
                    }
                    if (message.hasKey()) {
                        properties.key(message.getKey());
                    }
                    properties.send();
                    return doAcknowledge(messageId, ackType, map, (TransactionImpl) null);
                }
                initDeadLetterProducerIfNeeded();
                MessageId messageId2 = messageId;
                this.deadLetterProducer.thenAccept(producer -> {
                    producer.newMessage(Schema.AUTO_PRODUCE_BYTES(messageImpl.getReaderSchema().get())).value(messageImpl.getData()).properties(propertiesMap).sendAsync().thenAccept(messageId3 -> {
                        doAcknowledge(messageId2, ackType, (Map<String, Long>) map, (TransactionImpl) null).thenAccept(r4 -> {
                            completableFuture.complete(null);
                        }).exceptionally(th -> {
                            completableFuture.completeExceptionally(th);
                            return null;
                        });
                    }).exceptionally(th -> {
                        completableFuture.completeExceptionally(th);
                        return null;
                    });
                }).exceptionally(th -> {
                    completableFuture.completeExceptionally(th);
                    this.deadLetterProducer = null;
                    return null;
                });
            } catch (Exception e2) {
                log.error("Send to retry letter topic exception with topic: {}, messageId: {}", this.retryLetterProducer.getTopic(), messageId, e2);
                Set<MessageId> singleton = Collections.singleton(messageId);
                this.unAckedMessageTracker.remove(messageId);
                redeliverUnacknowledgedMessages(singleton);
            }
        }
        MessageId messageId3 = messageId;
        completableFuture.exceptionally(th2 -> {
            Set<MessageId> singleton2 = Collections.singleton(messageId3);
            this.unAckedMessageTracker.remove(messageId3);
            redeliverUnacknowledgedMessages(singleton2);
            return null;
        });
        return completableFuture;
    }

    private SortedMap<String, String> getPropertiesMap(Message<?> message, String str, String str2) {
        TreeMap treeMap = new TreeMap();
        if (message.getProperties() != null) {
            treeMap.putAll(message.getProperties());
        }
        treeMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, str2);
        treeMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, str);
        return treeMap;
    }

    private String getOriginMessageIdStr(Message<?> message) {
        if (message instanceof TopicMessageImpl) {
            return ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString();
        }
        if (message instanceof MessageImpl) {
            return message.getMessageId().toString();
        }
        return null;
    }

    private String getOriginTopicNameStr(Message<?> message) {
        if (message instanceof TopicMessageImpl) {
            return ((TopicMessageIdImpl) message.getMessageId()).getTopicName();
        }
        if (message instanceof MessageImpl) {
            return message.getTopicName();
        }
        return null;
    }

    private MessageImpl<?> getMessageImpl(Message<?> message) {
        if (message instanceof TopicMessageImpl) {
            return (MessageImpl) ((TopicMessageImpl) message).getMessage();
        }
        if (message instanceof MessageImpl) {
            return (MessageImpl) message;
        }
        return null;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void negativeAcknowledge(MessageId messageId) {
        this.negativeAcksTracker.add(messageId);
        this.unAckedMessageTracker.remove(messageId);
    }

    @Override // org.apache.pulsar.client.impl.ConnectionHandler.Connection
    public void connectionOpened(ClientCnx clientCnx) {
        int size;
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            setState(HandlerState.State.Closed);
            closeConsumerTasks();
            deregisterFromClientCnx();
            this.client.cleanupConsumer(this);
            failPendingReceive();
            clearReceiverQueue();
            return;
        }
        setClientCnx(clientCnx);
        log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}", this.topic, this.subscription, clientCnx.ctx().channel(), Long.valueOf(this.consumerId));
        long newRequestId = this.client.newRequestId();
        if (this.duringSeek.get()) {
            this.acknowledgmentsGroupingTracker.flushAndClean();
        }
        synchronized (this) {
            size = this.incomingMessages.size();
            this.startMessageId = clearReceiverQueue();
            if (this.possibleSendToDeadLetterTopicMessages != null) {
                this.possibleSendToDeadLetterTopicMessages.clear();
            }
        }
        boolean z = this.subscriptionMode == SubscriptionMode.Durable;
        MessageIdData messageIdData = null;
        if (z) {
            messageIdData = null;
        } else if (this.startMessageId != null) {
            messageIdData = new MessageIdData().setLedgerId(this.startMessageId.getLedgerId()).setEntryId(this.startMessageId.getEntryId()).setBatchIndex(this.startMessageId.getBatchIndex());
        }
        SchemaInfo schemaInfo = this.schema.getSchemaInfo();
        if (schemaInfo != null && (SchemaType.BYTES == schemaInfo.getType() || SchemaType.NONE == schemaInfo.getType())) {
            schemaInfo = null;
        }
        clientCnx.sendRequestWithId(Commands.newSubscribe(this.topic, this.subscription, this.consumerId, newRequestId, getSubType(), this.priorityLevel, this.consumerName, z, messageIdData, this.metadata, this.readCompacted, this.conf.isReplicateSubscriptionState(), CommandSubscribe.InitialPosition.valueOf(this.subscriptionInitialPosition.getValue()), (this.startMessageRollbackDurationInSec <= 0 || this.startMessageId == null || !this.startMessageId.equals(this.initialStartMessageId)) ? 0L : this.startMessageRollbackDurationInSec, schemaInfo, this.createTopicIfDoesNotExist, this.conf.getKeySharedPolicy()), newRequestId).thenRun(() -> {
            synchronized (this) {
                if (!changeToReadyState()) {
                    setState(HandlerState.State.Closed);
                    deregisterFromClientCnx();
                    this.client.cleanupConsumer(this);
                    clientCnx.channel().close();
                    return;
                }
                consumerIsReconnectedToBroker(clientCnx, size);
                resetBackoff();
                if ((this.subscribeFuture.complete(this) && this.hasParentConsumer && z) || this.conf.getReceiverQueueSize() == 0) {
                    return;
                }
                increaseAvailablePermits(clientCnx, this.conf.getReceiverQueueSize());
            }
        }).exceptionally(th -> {
            deregisterFromClientCnx();
            if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
                clientCnx.channel().close();
                return null;
            }
            log.warn("[{}][{}] Failed to subscribe to topic on {}", this.topic, this.subscription, clientCnx.channel().remoteAddress());
            if ((th.getCause() instanceof PulsarClientException) && PulsarClientException.isRetriableError(th.getCause()) && System.currentTimeMillis() < this.subscribeTimeout) {
                reconnectLater(th.getCause());
                return null;
            }
            if (!this.subscribeFuture.isDone()) {
                setState(HandlerState.State.Failed);
                closeConsumerTasks();
                this.subscribeFuture.completeExceptionally(PulsarClientException.wrap(th, String.format("Failed to subscribe the topic %s with subscription name %s when connecting to the broker", this.topicName.toString(), this.subscription)));
                this.client.cleanupConsumer(this);
                return null;
            }
            if (!(th.getCause() instanceof PulsarClientException.TopicDoesNotExistException)) {
                reconnectLater(th.getCause());
                return null;
            }
            setState(HandlerState.State.Failed);
            closeConsumerTasks();
            this.client.cleanupConsumer(this);
            log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", this.topic, this.subscription, clientCnx.channel().remoteAddress());
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void consumerIsReconnectedToBroker(ClientCnx clientCnx, int i) {
        log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", this.topic, this.subscription, clientCnx.channel().remoteAddress(), Long.valueOf(this.consumerId));
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
    }

    private BatchMessageIdImpl clearReceiverQueue() {
        ArrayList arrayList = new ArrayList(this.incomingMessages.size());
        this.incomingMessages.drainTo(arrayList);
        resetIncomingMessageSize();
        if (this.duringSeek.compareAndSet(true, false)) {
            return this.seekMessageId;
        }
        if (this.subscriptionMode == SubscriptionMode.Durable) {
            return this.startMessageId;
        }
        if (arrayList.isEmpty()) {
            return !this.lastDequeuedMessageId.equals(MessageId.earliest) ? new BatchMessageIdImpl((MessageIdImpl) this.lastDequeuedMessageId) : this.startMessageId;
        }
        MessageIdImpl messageIdImpl = (MessageIdImpl) ((Message) arrayList.get(0)).getMessageId();
        BatchMessageIdImpl batchMessageIdImpl = messageIdImpl instanceof BatchMessageIdImpl ? new BatchMessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex(), ((BatchMessageIdImpl) messageIdImpl).getBatchIndex() - 1) : new BatchMessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId() - 1, messageIdImpl.getPartitionIndex(), -1);
        arrayList.forEach((v0) -> {
            v0.release();
        });
        return batchMessageIdImpl;
    }

    private void sendFlowPermitsToBroker(ClientCnx clientCnx, int i) {
        if (clientCnx == null || i <= 0) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Adding {} additional permits", this.topic, this.subscription, Integer.valueOf(i));
        }
        if (log.isDebugEnabled()) {
            clientCnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, i)).addListener2(future -> {
                if (future.isSuccess()) {
                    log.debug("Consumer {} sent {} permits to broker", Long.valueOf(this.consumerId), Integer.valueOf(i));
                } else {
                    log.debug("Consumer {} failed to send {} permits to broker: {}", Long.valueOf(this.consumerId), Integer.valueOf(i), future.cause().getMessage());
                }
            });
        } else {
            clientCnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, i), clientCnx.ctx().voidPromise());
        }
    }

    @Override // org.apache.pulsar.client.impl.ConnectionHandler.Connection
    public void connectionFailed(PulsarClientException pulsarClientException) {
        boolean z = !PulsarClientException.isRetriableError(pulsarClientException);
        boolean z2 = System.currentTimeMillis() > this.subscribeTimeout;
        if ((z || z2) && this.subscribeFuture.completeExceptionally(pulsarClientException)) {
            setState(HandlerState.State.Failed);
            if (z) {
                log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}", this.topic, Long.valueOf(this.consumerId), pulsarClientException);
            } else {
                log.info("[{}] Consumer creation failed for consumer {} after timeout", this.topic, Long.valueOf(this.consumerId));
            }
            closeConsumerTasks();
            deregisterFromClientCnx();
            this.client.cleanupConsumer(this);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> closeAsync() {
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            closeConsumerTasks();
            return CompletableFuture.completedFuture(null);
        }
        if (!isConnected()) {
            log.info("[{}] [{}] Closed Consumer (not connected)", this.topic, this.subscription);
            setState(HandlerState.State.Closed);
            closeConsumerTasks();
            deregisterFromClientCnx();
            this.client.cleanupConsumer(this);
            return CompletableFuture.completedFuture(null);
        }
        this.stats.getStatTimeout().ifPresent((v0) -> {
            v0.cancel();
        });
        setState(HandlerState.State.Closing);
        closeConsumerTasks();
        long newRequestId = this.client.newRequestId();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ClientCnx cnx = cnx();
        if (null == cnx) {
            cleanupAtClose(completableFuture, null);
        } else {
            cnx.sendRequestWithId(Commands.newCloseConsumer(this.consumerId, newRequestId), newRequestId).handle((producerResponse, th) -> {
                boolean z = !cnx.ctx().channel().isActive();
                if (z && th != null) {
                    log.debug("Exception ignored in closing consumer", th);
                }
                cleanupAtClose(completableFuture, z ? null : th);
                return null;
            });
        }
        return completableFuture;
    }

    private void cleanupAtClose(CompletableFuture<Void> completableFuture, Throwable th) {
        log.info("[{}] [{}] Closed consumer", this.topic, this.subscription);
        setState(HandlerState.State.Closed);
        closeConsumerTasks();
        if (th != null) {
            completableFuture.completeExceptionally(th);
        } else {
            completableFuture.complete(null);
        }
        deregisterFromClientCnx();
        this.client.cleanupConsumer(this);
        failPendingReceive();
    }

    private void closeConsumerTasks() {
        this.unAckedMessageTracker.close();
        if (this.possibleSendToDeadLetterTopicMessages != null) {
            this.possibleSendToDeadLetterTopicMessages.clear();
        }
        this.acknowledgmentsGroupingTracker.close();
        if (this.batchReceiveTimeout != null) {
            this.batchReceiveTimeout.cancel();
        }
        this.stats.getStatTimeout().ifPresent((v0) -> {
            v0.cancel();
        });
    }

    private void failPendingReceive() {
        this.internalPinnedExecutor.execute(() -> {
            if (this.pinnedExecutor == null || this.pinnedExecutor.isShutdown()) {
                return;
            }
            failPendingReceives(this.pendingReceives);
            failPendingBatchReceives(this.pendingBatchReceives);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void activeConsumerChanged(boolean z) {
        if (this.consumerEventListener == null) {
            return;
        }
        this.pinnedExecutor.execute(() -> {
            if (z) {
                this.consumerEventListener.becameActive(this, this.partitionIndex);
            } else {
                this.consumerEventListener.becameInactive(this, this.partitionIndex);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void messageReceived(MessageIdData messageIdData, int i, List<Long> list, ByteBuf byteBuf, ClientCnx clientCnx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received message: {}/{}", this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()));
        }
        if (!verifyChecksum(byteBuf, messageIdData)) {
            discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.ChecksumMismatch);
            return;
        }
        try {
            MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(byteBuf);
            int numMessagesInBatch = parseMessageMetadata.getNumMessagesInBatch();
            boolean z = (parseMessageMetadata.hasNumChunksFromMsg() ? parseMessageMetadata.getNumChunksFromMsg() : 0) > 1 && this.conf.getSubscriptionType() != SubscriptionType.Shared;
            MessageIdImpl messageIdImpl = new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), getPartitionIndex());
            if (this.acknowledgmentsGroupingTracker.isDuplicate(messageIdImpl)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", this.topic, this.subscription, this.consumerName, messageIdImpl);
                }
                increaseAvailablePermits(clientCnx, numMessagesInBatch);
                return;
            }
            ByteBuf decryptPayloadIfNeeded = decryptPayloadIfNeeded(messageIdData, parseMessageMetadata, byteBuf, clientCnx);
            boolean isMessageUndecryptable = isMessageUndecryptable(parseMessageMetadata);
            if (decryptPayloadIfNeeded == null) {
                return;
            }
            ByteBuf retain = (isMessageUndecryptable || z) ? decryptPayloadIfNeeded.retain() : uncompressPayloadIfNeeded(messageIdData, parseMessageMetadata, decryptPayloadIfNeeded, clientCnx, true);
            decryptPayloadIfNeeded.release();
            if (retain == null) {
                return;
            }
            if (isMessageUndecryptable || (numMessagesInBatch == 1 && !parseMessageMetadata.hasNumMessagesInBatch())) {
                if (z) {
                    retain = processMessageChunk(retain, parseMessageMetadata, messageIdImpl, messageIdData, clientCnx);
                    if (retain == null) {
                        return;
                    }
                }
                if (isSameEntry(messageIdData) && isPriorEntryIndex(messageIdData.getEntryId())) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", this.subscription, this.consumerName, this.startMessageId);
                    }
                    retain.release();
                    return;
                } else {
                    MessageImpl create = MessageImpl.create(this.topicName.toString(), messageIdImpl, parseMessageMetadata, retain, createEncryptionContext(parseMessageMetadata), clientCnx, this.schema, i, this.poolMessages);
                    retain.release();
                    this.internalPinnedExecutor.execute(() -> {
                        if (this.deadLetterPolicy != null && this.possibleSendToDeadLetterTopicMessages != null && i >= this.deadLetterPolicy.getMaxRedeliverCount()) {
                            this.possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) create.getMessageId(), Collections.singletonList(create));
                        }
                        if (peekPendingReceive() != null) {
                            notifyPendingReceivedCallback(create, null);
                        } else if (enqueueMessageAndCheckBatchReceive(create) && hasPendingBatchReceive()) {
                            notifyPendingBatchReceivedCallBack();
                        }
                    });
                }
            } else {
                receiveIndividualMessagesFromBatch(parseMessageMetadata, i, list, retain, messageIdData, clientCnx);
                retain.release();
            }
            this.internalPinnedExecutor.execute(() -> {
                tryTriggerListener();
            });
        } catch (Throwable th) {
            discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.ChecksumMismatch);
        }
    }

    private void tryTriggerListener() {
        if (this.listener != null) {
            triggerListener();
        }
    }

    private boolean isTxnMessage(MessageMetadata messageMetadata) {
        return messageMetadata.hasTxnidMostBits() && messageMetadata.hasTxnidLeastBits();
    }

    private ByteBuf processMessageChunk(ByteBuf byteBuf, MessageMetadata messageMetadata, MessageIdImpl messageIdImpl, MessageIdData messageIdData, ClientCnx clientCnx) {
        if (!this.expireChunkMessageTaskScheduled && this.expireTimeOfIncompleteChunkedMessageMillis > 0) {
            this.pinnedExecutor.scheduleAtFixedRate(() -> {
                removeExpireIncompleteChunkedMessages();
            }, this.expireTimeOfIncompleteChunkedMessageMillis, this.expireTimeOfIncompleteChunkedMessageMillis, TimeUnit.MILLISECONDS);
            this.expireChunkMessageTaskScheduled = true;
        }
        if (messageMetadata.getChunkId() == 0) {
            ByteBuf directBuffer = Unpooled.directBuffer(messageMetadata.getTotalChunkMsgSize(), messageMetadata.getTotalChunkMsgSize());
            int numChunksFromMsg = messageMetadata.getNumChunksFromMsg();
            this.chunkedMessagesMap.computeIfAbsent(messageMetadata.getUuid(), str -> {
                return ChunkedMessageCtx.get(numChunksFromMsg, directBuffer);
            });
            this.pendingChunkedMessageCount++;
            if (this.maxPendingChunkedMessage > 0 && this.pendingChunkedMessageCount > this.maxPendingChunkedMessage) {
                removeOldestPendingChunkedMessage();
            }
            this.pendingChunkedMessageUuidQueue.add(messageMetadata.getUuid());
        }
        ChunkedMessageCtx chunkedMessageCtx = this.chunkedMessagesMap.get(messageMetadata.getUuid());
        if (chunkedMessageCtx != null && chunkedMessageCtx.chunkedMsgBuffer != null && messageMetadata.getChunkId() == chunkedMessageCtx.lastChunkedMessageId + 1 && messageMetadata.getChunkId() < messageMetadata.getTotalChunkMsgSize()) {
            chunkedMessageCtx.chunkedMessageIds[messageMetadata.getChunkId()] = messageIdImpl;
            chunkedMessageCtx.chunkedMsgBuffer.writeBytes(byteBuf);
            chunkedMessageCtx.lastChunkedMessageId = messageMetadata.getChunkId();
            if (messageMetadata.getChunkId() != messageMetadata.getNumChunksFromMsg() - 1) {
                byteBuf.release();
                increaseAvailablePermits(clientCnx);
                return null;
            }
            if (log.isDebugEnabled()) {
                log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}", Integer.valueOf(messageMetadata.getChunkId()), Integer.valueOf(messageMetadata.getNumChunksFromMsg()), messageIdImpl, Long.valueOf(messageMetadata.getSequenceId()));
            }
            this.chunkedMessagesMap.remove(messageMetadata.getUuid());
            this.unAckedChunkedMessageIdSequenceMap.put(messageIdImpl, chunkedMessageCtx.chunkedMessageIds);
            this.pendingChunkedMessageCount--;
            byteBuf.release();
            ByteBuf byteBuf2 = chunkedMessageCtx.chunkedMsgBuffer;
            chunkedMessageCtx.recycle();
            ByteBuf uncompressPayloadIfNeeded = uncompressPayloadIfNeeded(messageIdData, messageMetadata, byteBuf2, clientCnx, false);
            byteBuf2.release();
            return uncompressPayloadIfNeeded;
        }
        Logger logger = log;
        Object[] objArr = new Object[4];
        objArr[0] = messageIdImpl;
        objArr[1] = chunkedMessageCtx != null ? Integer.valueOf(chunkedMessageCtx.lastChunkedMessageId) : null;
        objArr[2] = Integer.valueOf(messageMetadata.getChunkId());
        objArr[3] = Integer.valueOf(messageMetadata.getTotalChunkMsgSize());
        logger.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", objArr);
        if (chunkedMessageCtx != null) {
            if (chunkedMessageCtx.chunkedMsgBuffer != null) {
                ReferenceCountUtil.safeRelease(chunkedMessageCtx.chunkedMsgBuffer);
            }
            chunkedMessageCtx.recycle();
        }
        this.chunkedMessagesMap.remove(messageMetadata.getUuid());
        byteBuf.release();
        increaseAvailablePermits(clientCnx);
        if (this.expireTimeOfIncompleteChunkedMessageMillis <= 0 || System.currentTimeMillis() <= messageMetadata.getPublishTime() + this.expireTimeOfIncompleteChunkedMessageMillis) {
            trackMessage(messageIdImpl);
            return null;
        }
        doAcknowledge(messageIdImpl, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl) null);
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyPendingReceivedCallback(Message<T> message, Exception exc) {
        CompletableFuture<Message<T>> pollPendingReceive;
        if (this.pendingReceives.isEmpty() || (pollPendingReceive = pollPendingReceive()) == null) {
            return;
        }
        if (exc != null) {
            this.pinnedExecutor.execute(() -> {
                pollPendingReceive.completeExceptionally(exc);
            });
            return;
        }
        if (message == null) {
            IllegalStateException illegalStateException = new IllegalStateException("received message can't be null");
            this.pinnedExecutor.execute(() -> {
                pollPendingReceive.completeExceptionally(illegalStateException);
            });
        } else if (this.conf.getReceiverQueueSize() == 0) {
            trackMessage((Message<?>) message);
            interceptAndComplete(message, pollPendingReceive);
        } else {
            messageProcessed(message);
            interceptAndComplete(message, pollPendingReceive);
        }
    }

    private void interceptAndComplete(Message<T> message, CompletableFuture<Message<T>> completableFuture) {
        completePendingReceive(completableFuture, beforeConsume(message));
    }

    void receiveIndividualMessagesFromBatch(MessageMetadata messageMetadata, int i, List<Long> list, ByteBuf byteBuf, MessageIdData messageIdData, ClientCnx clientCnx) {
        int numMessagesInBatch = messageMetadata.getNumMessagesInBatch();
        MessageIdImpl messageIdImpl = new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), getPartitionIndex());
        ArrayList arrayList = null;
        if (this.deadLetterPolicy != null && i >= this.deadLetterPolicy.getMaxRedeliverCount()) {
            arrayList = new ArrayList();
        }
        BatchMessageAcker newAcker = BatchMessageAcker.newAcker(numMessagesInBatch);
        BitSetRecyclable bitSetRecyclable = null;
        if (list != null && list.size() > 0) {
            bitSetRecyclable = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(list));
        }
        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
        int i2 = 0;
        for (int i3 = 0; i3 < numMessagesInBatch; i3++) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] processing message num - {} in batch", this.subscription, this.consumerName, Integer.valueOf(i3));
                }
                ByteBuf deSerializeSingleMessageInBatch = Commands.deSerializeSingleMessageInBatch(byteBuf, singleMessageMetadata, i3, numMessagesInBatch);
                if (isSameEntry(messageIdData) && isPriorBatchIndex(i3)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", this.subscription, this.consumerName, this.startMessageId);
                    }
                    deSerializeSingleMessageInBatch.release();
                    i2++;
                } else if (singleMessageMetadata.isCompactedOut()) {
                    deSerializeSingleMessageInBatch.release();
                    i2++;
                } else if (bitSetRecyclable == null || bitSetRecyclable.get(i3)) {
                    MessageImpl<T> create = MessageImpl.create(this.topicName.toString(), new BatchMessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), getPartitionIndex(), i3, numMessagesInBatch, newAcker), messageMetadata, singleMessageMetadata, deSerializeSingleMessageInBatch, createEncryptionContext(messageMetadata), clientCnx, this.schema, i, this.poolMessages);
                    if (arrayList != null) {
                        arrayList.add(create);
                    }
                    this.internalPinnedExecutor.execute(() -> {
                        if (peekPendingReceive() != null) {
                            notifyPendingReceivedCallback(create, null);
                        } else if (enqueueMessageAndCheckBatchReceive(create) && hasPendingBatchReceive()) {
                            notifyPendingBatchReceivedCallBack();
                        }
                        deSerializeSingleMessageInBatch.release();
                    });
                } else {
                    deSerializeSingleMessageInBatch.release();
                    i2++;
                }
            } catch (IOException e) {
                log.warn("[{}] [{}] unable to obtain message in batch", this.subscription, this.consumerName);
                discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.BatchDeSerializeError);
            }
        }
        if (bitSetRecyclable != null) {
            bitSetRecyclable.recycle();
        }
        if (arrayList != null && this.possibleSendToDeadLetterTopicMessages != null) {
            this.possibleSendToDeadLetterTopicMessages.put(messageIdImpl, arrayList);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", this.subscription, this.consumerName, Integer.valueOf(this.incomingMessages.size()), Integer.valueOf(this.incomingMessages.remainingCapacity()));
        }
        if (i2 > 0) {
            increaseAvailablePermits(clientCnx, i2);
        }
    }

    private boolean isPriorEntryIndex(long j) {
        return this.resetIncludeHead ? j < this.startMessageId.getEntryId() : j <= this.startMessageId.getEntryId();
    }

    private boolean isPriorBatchIndex(long j) {
        return this.resetIncludeHead ? j < ((long) this.startMessageId.getBatchIndex()) : j <= ((long) this.startMessageId.getBatchIndex());
    }

    private boolean isSameEntry(MessageIdData messageIdData) {
        return this.startMessageId != null && messageIdData.getLedgerId() == this.startMessageId.getLedgerId() && messageIdData.getEntryId() == this.startMessageId.getEntryId();
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected synchronized void messageProcessed(Message<?> message) {
        ClientCnx cnx = cnx();
        ClientCnx cnx2 = ((MessageImpl) message).getCnx();
        this.lastDequeuedMessageId = message.getMessageId();
        if (cnx2 == cnx) {
            increaseAvailablePermits(cnx);
            this.stats.updateNumMsgsReceived(message);
            trackMessage(message);
        }
        decreaseIncomingMessageSize(message);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void trackMessage(Message<?> message) {
        if (message != null) {
            trackMessage(message.getMessageId());
        }
    }

    protected void trackMessage(MessageId messageId) {
        if (this.conf.getAckTimeoutMillis() <= 0 || !(messageId instanceof MessageIdImpl)) {
            return;
        }
        MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
        if (messageIdImpl instanceof BatchMessageIdImpl) {
            messageIdImpl = new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), getPartitionIndex());
        }
        if (this.hasParentConsumer) {
            this.unAckedMessageTracker.remove(messageIdImpl);
        } else {
            this.unAckedMessageTracker.add(messageIdImpl);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void increaseAvailablePermits(ClientCnx clientCnx) {
        increaseAvailablePermits(clientCnx, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void increaseAvailablePermits(ClientCnx clientCnx, int i) {
        int addAndGet = AVAILABLE_PERMITS_UPDATER.addAndGet(this, i);
        while (true) {
            int i2 = addAndGet;
            if (i2 < this.receiverQueueRefillThreshold || this.paused) {
                return;
            }
            if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, i2, 0)) {
                sendFlowPermitsToBroker(clientCnx, i2);
                return;
            }
            addAndGet = AVAILABLE_PERMITS_UPDATER.get(this);
        }
    }

    public void increaseAvailablePermits(int i) {
        increaseAvailablePermits(cnx(), i);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void pause() {
        this.paused = true;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void resume() {
        if (this.paused) {
            this.paused = false;
            increaseAvailablePermits(cnx(), 0);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public long getLastDisconnectedTimestamp() {
        return this.connectionHandler.lastConnectionClosedTimestamp;
    }

    private ByteBuf decryptPayloadIfNeeded(MessageIdData messageIdData, MessageMetadata messageMetadata, ByteBuf byteBuf, ClientCnx clientCnx) {
        if (messageMetadata.getEncryptionKeysCount() == 0) {
            return byteBuf.retain();
        }
        if (this.conf.getCryptoKeyReader() == null) {
            switch (this.conf.getCryptoFailureAction()) {
                case CONSUME:
                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming encrypted message.", this.topic, this.subscription, this.consumerName);
                    return byteBuf.retain();
                case DISCARD:
                    log.warn("[{}][{}][{}] Skipping decryption since CryptoKeyReader interface is not implemented and config is set to discard", this.topic, this.subscription, this.consumerName);
                    discardMessage(messageIdData, clientCnx, CommandAck.ValidationError.DecryptionError);
                    return null;
                case FAIL:
                    MessageIdImpl messageIdImpl = new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), this.partitionIndex);
                    log.error("[{}][{}][{}][{}] Message delivery failed since CryptoKeyReader interface is not implemented to consume encrypted message", this.topic, this.subscription, this.consumerName, messageIdImpl);
                    this.unAckedMessageTracker.add(messageIdImpl);
                    return null;
            }
        }
        int maxOutputSize = this.msgCrypto.getMaxOutputSize(byteBuf.readableBytes());
        ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.buffer(maxOutputSize);
        ByteBuffer nioBuffer = buffer.nioBuffer(0, maxOutputSize);
        if (this.msgCrypto.decrypt(() -> {
            return messageMetadata;
        }, byteBuf.nioBuffer(), nioBuffer, this.conf.getCryptoKeyReader())) {
            buffer.writerIndex(nioBuffer.limit());
            return buffer;
        }
        buffer.release();
        switch (this.conf.getCryptoFailureAction()) {
            case CONSUME:
                log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to consume.", this.topic, this.subscription, this.consumerName, messageIdData);
                return byteBuf.retain();
            case DISCARD:
                log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and config is set to discard", this.topic, this.subscription, this.consumerName, messageIdData);
                discardMessage(messageIdData, clientCnx, CommandAck.ValidationError.DecryptionError);
                return null;
            case FAIL:
                MessageIdImpl messageIdImpl2 = new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), this.partitionIndex);
                log.error("[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming message", this.topic, this.subscription, this.consumerName, messageIdImpl2);
                this.unAckedMessageTracker.add(messageIdImpl2);
                return null;
            default:
                return null;
        }
    }

    private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageIdData, MessageMetadata messageMetadata, ByteBuf byteBuf, ClientCnx clientCnx, boolean z) {
        CompressionType compression = messageMetadata.getCompression();
        CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec(compression);
        int uncompressedSize = messageMetadata.getUncompressedSize();
        int readableBytes = byteBuf.readableBytes();
        if (z && readableBytes > ClientCnx.getMaxMessageSize()) {
            log.error("[{}][{}] Got corrupted payload message size {} at {}", this.topic, this.subscription, Integer.valueOf(readableBytes), messageIdData);
            discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.UncompressedSizeCorruption);
            return null;
        }
        try {
            return compressionCodec.decode(byteBuf, uncompressedSize);
        } catch (IOException e) {
            log.error("[{}][{}] Failed to decompress message with {} at {}: {}", this.topic, this.subscription, compression, messageIdData, e.getMessage(), e);
            discardCorruptedMessage(messageIdData, clientCnx, CommandAck.ValidationError.DecompressionError);
            return null;
        }
    }

    private boolean verifyChecksum(ByteBuf byteBuf, MessageIdData messageIdData) {
        int readChecksum;
        int computeChecksum;
        if (!Commands.hasChecksum(byteBuf) || (readChecksum = Commands.readChecksum(byteBuf)) == (computeChecksum = Crc32cIntChecksum.computeChecksum(byteBuf))) {
            return true;
        }
        log.error("[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}", this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()), Long.toHexString(readChecksum), Integer.toHexString(computeChecksum));
        return false;
    }

    private void discardCorruptedMessage(MessageIdData messageIdData, ClientCnx clientCnx, CommandAck.ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()));
        discardMessage(messageIdData, clientCnx, validationError);
    }

    private void discardMessage(MessageIdData messageIdData, ClientCnx clientCnx, CommandAck.ValidationError validationError) {
        clientCnx.ctx().writeAndFlush(Commands.newAck(this.consumerId, messageIdData.getLedgerId(), messageIdData.getEntryId(), null, CommandAck.AckType.Individual, validationError, Collections.emptyMap(), -1L), clientCnx.ctx().voidPromise());
        increaseAvailablePermits(clientCnx);
        this.stats.incrementNumReceiveFailed();
    }

    @Override // org.apache.pulsar.client.impl.HandlerState
    String getHandlerName() {
        return this.subscription;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public boolean isConnected() {
        return getClientCnx() != null && getState() == HandlerState.State.Ready;
    }

    int getPartitionIndex() {
        return this.partitionIndex;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int getAvailablePermits() {
        return AVAILABLE_PERMITS_UPDATER.get(this);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int numMessagesInQueue() {
        return this.incomingMessages.size();
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void redeliverUnacknowledgedMessages() {
        int size;
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
            if (cnx == null || getState() == HandlerState.State.Connecting) {
                log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", this);
                return;
            } else {
                log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
                cnx.ctx().close();
                return;
            }
        }
        synchronized (this) {
            size = this.incomingMessages.size();
            clearIncomingMessages();
            this.unAckedMessageTracker.clear();
        }
        cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId), cnx.ctx().voidPromise());
        if (size > 0) {
            increaseAvailablePermits(cnx, size);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", this.subscription, this.topic, this.consumerName, Integer.valueOf(size));
        }
    }

    public int clearIncomingMessagesAndGetMessageNumber() {
        int size = this.incomingMessages.size();
        this.incomingMessages.forEach((v0) -> {
            v0.release();
        });
        clearIncomingMessages();
        this.unAckedMessageTracker.clear();
        return size;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public void redeliverUnacknowledgedMessages(Set<MessageId> set) {
        if (set.isEmpty()) {
            return;
        }
        Preconditions.checkArgument(set.stream().findFirst().get() instanceof MessageIdImpl);
        if (this.conf.getSubscriptionType() != SubscriptionType.Shared && this.conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
            redeliverUnacknowledgedMessages();
            return;
        }
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
            if (cnx == null || getState() == HandlerState.State.Connecting) {
                log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", this);
                return;
            } else {
                log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
                cnx.ctx().close();
                return;
            }
        }
        int removeExpiredMessagesFromQueue = removeExpiredMessagesFromQueue(set);
        Iterables.partition((Iterable) set.stream().map(messageId -> {
            return (MessageIdImpl) messageId;
        }).collect(Collectors.toSet()), 1000).forEach(list -> {
            getRedeliveryMessageIdData(list).thenAccept(list -> {
                if (list.isEmpty()) {
                    return;
                }
                cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId, list), cnx.ctx().voidPromise());
            });
        });
        if (removeExpiredMessagesFromQueue > 0) {
            increaseAvailablePermits(cnx, removeExpiredMessagesFromQueue);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", this.subscription, this.topic, this.consumerName, Integer.valueOf(removeExpiredMessagesFromQueue));
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected void completeOpBatchReceive(ConsumerBase.OpBatchReceive<T> opBatchReceive) {
        notifyPendingBatchReceivedCallBack(opBatchReceive);
    }

    private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<MessageIdImpl> list) {
        if (list == null || list.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        ArrayList arrayList = new ArrayList(list.size());
        ArrayList arrayList2 = new ArrayList(list.size());
        list.forEach(messageIdImpl -> {
            CompletableFuture<Boolean> processPossibleToDLQ = processPossibleToDLQ(messageIdImpl);
            arrayList2.add(processPossibleToDLQ);
            processPossibleToDLQ.thenAccept(bool -> {
                if (bool.booleanValue()) {
                    return;
                }
                arrayList.add(new MessageIdData().setPartition(messageIdImpl.getPartitionIndex()).setLedgerId(messageIdImpl.getLedgerId()).setEntryId(messageIdImpl.getEntryId()));
            });
        });
        return FutureUtil.waitForAll(arrayList2).thenCompose(r3 -> {
            return CompletableFuture.completedFuture(arrayList);
        });
    }

    private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageIdImpl) {
        List<MessageImpl<T>> list = null;
        if (this.possibleSendToDeadLetterTopicMessages != null) {
            if (messageIdImpl instanceof BatchMessageIdImpl) {
                messageIdImpl = new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), getPartitionIndex());
            }
            list = this.possibleSendToDeadLetterTopicMessages.get(messageIdImpl);
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        if (list != null) {
            initDeadLetterProducerIfNeeded();
            List<MessageImpl<T>> list2 = list;
            MessageIdImpl messageIdImpl2 = messageIdImpl;
            this.deadLetterProducer.thenAcceptAsync(producer -> {
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    MessageImpl messageImpl = (MessageImpl) it.next();
                    producer.newMessage(Schema.AUTO_PRODUCE_BYTES(messageImpl.getReaderSchema().get())).value(messageImpl.getData()).properties(getPropertiesMap(messageImpl, getOriginMessageIdStr(messageImpl), getOriginTopicNameStr(messageImpl))).sendAsync().thenAccept(messageId -> {
                        this.possibleSendToDeadLetterTopicMessages.remove(messageIdImpl2);
                        acknowledgeAsync(messageIdImpl2).whenComplete((r10, th) -> {
                            if (th != null) {
                                log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original topic but send to the DLQ successfully.", this.topicName, this.subscription, this.consumerName, messageIdImpl2, th);
                            } else {
                                completableFuture.complete(true);
                            }
                        });
                    }).exceptionally(th -> {
                        log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", this.topicName, this.subscription, this.consumerName, messageIdImpl2, th);
                        completableFuture.complete(false);
                        return null;
                    });
                }
            }).exceptionally(th -> {
                log.error("Dead letter producer exception with topic: {}", this.deadLetterPolicy.getDeadLetterTopic(), th);
                this.deadLetterProducer = null;
                completableFuture.complete(false);
                return null;
            });
        } else {
            completableFuture.complete(false);
        }
        return completableFuture;
    }

    private void initDeadLetterProducerIfNeeded() {
        if (this.deadLetterProducer == null) {
            this.createProducerLock.writeLock().lock();
            try {
                if (this.deadLetterProducer == null) {
                    this.deadLetterProducer = this.client.newProducer(Schema.AUTO_PRODUCE_BYTES(this.schema)).topic(this.deadLetterPolicy.getDeadLetterTopic()).blockIfQueueFull(false).createAsync();
                }
            } finally {
                this.createProducerLock.writeLock().unlock();
            }
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void seek(MessageId messageId) throws PulsarClientException {
        try {
            seekAsync(messageId).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void seek(long j) throws PulsarClientException {
        try {
            seekAsync(j).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void seek(Function<String, Object> function) throws PulsarClientException {
        try {
            seekAsync(function).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
        if (function == null) {
            return FutureUtil.failedFuture(new PulsarClientException("Function must be set"));
        }
        Object apply = function.apply(this.topic);
        return apply == null ? CompletableFuture.completedFuture(null) : apply instanceof MessageId ? seekAsync((MessageId) apply) : apply.getClass().getTypeName().equals(Long.class.getTypeName()) ? seekAsync(((Long) apply).longValue()) : FutureUtil.failedFuture(new PulsarClientException("Only support seek by messageId or timestamp"));
    }

    private Optional<CompletableFuture<Void>> seekAsyncCheckState(String str) {
        return (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) ? Optional.of(FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException(String.format("The consumer %s was already closed when seeking the subscription %s of the topic %s to %s", this.consumerName, this.subscription, this.topicName.toString(), str)))) : !isConnected() ? Optional.of(FutureUtil.failedFuture(new PulsarClientException(String.format("The client is not connected to the broker when seeking the subscription %s of the topic %s to %s", this.subscription, this.topicName.toString(), str)))) : Optional.empty();
    }

    private CompletableFuture<Void> seekAsyncInternal(long j, ByteBuf byteBuf, MessageId messageId, String str) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ClientCnx cnx = cnx();
        log.info("[{}][{}] Seek subscription to {}", this.topic, this.subscription, str);
        cnx.sendRequestWithId(byteBuf, j).thenRun(() -> {
            log.info("[{}][{}] Successfully reset subscription to {}", this.topic, this.subscription, str);
            this.acknowledgmentsGroupingTracker.flushAndClean();
            this.seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
            this.duringSeek.set(true);
            this.lastDequeuedMessageId = MessageId.earliest;
            clearIncomingMessages();
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.error("[{}][{}] Failed to reset subscription: {}", this.topic, this.subscription, th.getCause().getMessage());
            completableFuture.completeExceptionally(PulsarClientException.wrap(th.getCause(), String.format("Failed to seek the subscription %s of the topic %s to %s", this.subscription, this.topicName.toString(), str)));
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> seekAsync(long j) {
        String format = String.format("the timestamp %d", Long.valueOf(j));
        return seekAsyncCheckState(format).orElseGet(() -> {
            long newRequestId = this.client.newRequestId();
            return seekAsyncInternal(newRequestId, Commands.newSeek(this.consumerId, newRequestId, j), MessageId.earliest, format);
        });
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        String format = String.format("the message %s", messageId.toString());
        return seekAsyncCheckState(format).orElseGet(() -> {
            ByteBuf newSeek;
            long newRequestId = this.client.newRequestId();
            if (messageId instanceof BatchMessageIdImpl) {
                BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageId;
                BitSetRecyclable create = BitSetRecyclable.create();
                create.set(0, batchMessageIdImpl.getBatchSize());
                create.clear(0, Math.max(batchMessageIdImpl.getBatchIndex(), 0));
                long[] longArray = create.toLongArray();
                create.recycle();
                newSeek = Commands.newSeek(this.consumerId, newRequestId, batchMessageIdImpl.getLedgerId(), batchMessageIdImpl.getEntryId(), longArray);
            } else {
                MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
                newSeek = Commands.newSeek(this.consumerId, newRequestId, messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), new long[0]);
            }
            return seekAsyncInternal(newRequestId, newSeek, messageId, format);
        });
    }

    public boolean hasMessageAvailable() throws PulsarClientException {
        try {
            return hasMessageAvailableAsync().get().booleanValue();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        if (this.lastDequeuedMessageId == MessageId.earliest) {
            if (this.startMessageId.equals(MessageId.latest)) {
                CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync = internalGetLastMessageIdAsync();
                if (this.resetIncludeHead) {
                    internalGetLastMessageIdAsync = internalGetLastMessageIdAsync.thenCompose(getLastMessageIdResponse -> {
                        return seekAsync(getLastMessageIdResponse.lastMessageId).thenApply(r3 -> {
                            return getLastMessageIdResponse;
                        });
                    });
                }
                internalGetLastMessageIdAsync.thenAccept(getLastMessageIdResponse2 -> {
                    MessageIdImpl convertToMessageIdImpl = MessageIdImpl.convertToMessageIdImpl(getLastMessageIdResponse2.lastMessageId);
                    MessageIdImpl convertToMessageIdImpl2 = MessageIdImpl.convertToMessageIdImpl(getLastMessageIdResponse2.markDeletePosition);
                    if (convertToMessageIdImpl2 != null) {
                        int result = ComparisonChain.start().compare(convertToMessageIdImpl2.getLedgerId(), convertToMessageIdImpl.getLedgerId()).compare(convertToMessageIdImpl2.getEntryId(), convertToMessageIdImpl.getEntryId()).result();
                        if (convertToMessageIdImpl.getEntryId() < 0) {
                            completableFuture.complete(false);
                            return;
                        } else {
                            completableFuture.complete(Boolean.valueOf(this.resetIncludeHead ? result <= 0 : result < 0));
                            return;
                        }
                    }
                    if (convertToMessageIdImpl == null || convertToMessageIdImpl.getEntryId() < 0) {
                        completableFuture.complete(false);
                    } else {
                        completableFuture.complete(Boolean.valueOf(this.resetIncludeHead));
                    }
                }).exceptionally(th -> {
                    log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription, th);
                    completableFuture.completeExceptionally(th.getCause());
                    return null;
                });
                return completableFuture;
            }
            if (hasMoreMessages(this.lastMessageIdInBroker, this.startMessageId, this.resetIncludeHead)) {
                completableFuture.complete(true);
                return completableFuture;
            }
            getLastMessageIdAsync().thenAccept(messageId -> {
                this.lastMessageIdInBroker = messageId;
                if (hasMoreMessages(this.lastMessageIdInBroker, this.startMessageId, this.resetIncludeHead)) {
                    completableFuture.complete(true);
                } else {
                    completableFuture.complete(false);
                }
            }).exceptionally(th2 -> {
                log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription);
                completableFuture.completeExceptionally(th2.getCause());
                return null;
            });
        } else {
            if (hasMoreMessages(this.lastMessageIdInBroker, this.lastDequeuedMessageId, false)) {
                completableFuture.complete(true);
                return completableFuture;
            }
            getLastMessageIdAsync().thenAccept(messageId2 -> {
                this.lastMessageIdInBroker = messageId2;
                if (hasMoreMessages(this.lastMessageIdInBroker, this.lastDequeuedMessageId, false)) {
                    completableFuture.complete(true);
                } else {
                    completableFuture.complete(false);
                }
            }).exceptionally(th3 -> {
                log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription);
                completableFuture.completeExceptionally(th3.getCause());
                return null;
            });
        }
        return completableFuture;
    }

    private boolean hasMoreMessages(MessageId messageId, MessageId messageId2, boolean z) {
        if (!z || messageId.compareTo(messageId2) < 0 || ((MessageIdImpl) messageId).getEntryId() == -1) {
            return (z || messageId.compareTo(messageId2) <= 0 || ((MessageIdImpl) messageId).getEntryId() == -1) ? false : true;
        }
        return true;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<MessageId> getLastMessageIdAsync() {
        return internalGetLastMessageIdAsync().thenApply(getLastMessageIdResponse -> {
            return getLastMessageIdResponse.lastMessageId;
        });
    }

    public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
        if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException(String.format("The consumer %s was already closed when the subscription %s of the topic %s getting the last message id", this.consumerName, this.subscription, this.topicName.toString())));
        }
        AtomicLong atomicLong = new AtomicLong(this.client.getConfiguration().getOperationTimeoutMs());
        Backoff create = new BackoffBuilder().setInitialTime(100L, TimeUnit.MILLISECONDS).setMax(atomicLong.get() * 2, TimeUnit.MILLISECONDS).setMandatoryStop(0L, TimeUnit.MILLISECONDS).create();
        CompletableFuture<GetLastMessageIdResponse> completableFuture = new CompletableFuture<>();
        internalGetLastMessageIdAsync(create, atomicLong, completableFuture);
        return completableFuture;
    }

    private void internalGetLastMessageIdAsync(Backoff backoff, AtomicLong atomicLong, CompletableFuture<GetLastMessageIdResponse> completableFuture) {
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx == null) {
            long min = Math.min(backoff.next(), atomicLong.get());
            if (min <= 0) {
                completableFuture.completeExceptionally(new PulsarClientException.TimeoutException(String.format("The subscription %s of the topic %s could not get the last message id withing configured timeout", this.subscription, this.topicName.toString())));
                return;
            } else {
                this.pinnedExecutor.schedule(() -> {
                    log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", this.topic, getHandlerName(), Long.valueOf(min));
                    atomicLong.addAndGet(-min);
                    internalGetLastMessageIdAsync(backoff, atomicLong, completableFuture);
                }, min, TimeUnit.MILLISECONDS);
                return;
            }
        }
        if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
            completableFuture.completeExceptionally(new PulsarClientException.NotSupportedException(String.format("The command `GetLastMessageId` is not supported for the protocol version %d. The consumer is %s, topic %s, subscription %s", Integer.valueOf(cnx.getRemoteEndpointProtocolVersion()), this.consumerName, this.topicName.toString(), this.subscription)));
            return;
        }
        long newRequestId = this.client.newRequestId();
        ByteBuf newGetLastMessageId = Commands.newGetLastMessageId(this.consumerId, newRequestId);
        log.info("[{}][{}] Get topic last message Id", this.topic, this.subscription);
        cnx.sendGetLastMessageId(newGetLastMessageId, newRequestId).thenAccept(commandGetLastMessageIdResponse -> {
            MessageIdData lastMessageId = commandGetLastMessageIdResponse.getLastMessageId();
            MessageIdImpl messageIdImpl = null;
            if (commandGetLastMessageIdResponse.hasConsumerMarkDeletePosition()) {
                messageIdImpl = new MessageIdImpl(commandGetLastMessageIdResponse.getConsumerMarkDeletePosition().getLedgerId(), commandGetLastMessageIdResponse.getConsumerMarkDeletePosition().getEntryId(), -1);
            }
            log.info("[{}][{}] Successfully getLastMessageId {}:{}", this.topic, this.subscription, Long.valueOf(lastMessageId.getLedgerId()), Long.valueOf(lastMessageId.getEntryId()));
            completableFuture.complete(new GetLastMessageIdResponse(lastMessageId.getBatchIndex() <= 0 ? new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), lastMessageId.getPartition()) : new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), lastMessageId.getPartition(), lastMessageId.getBatchIndex()), messageIdImpl));
        }).exceptionally(th -> {
            log.error("[{}][{}] Failed getLastMessageId command", this.topic, this.subscription);
            completableFuture.completeExceptionally(PulsarClientException.wrap(th.getCause(), String.format("The subscription %s of the topic %s gets the last message id was failed", this.subscription, this.topicName.toString())));
            return null;
        });
    }

    private MessageIdImpl getMessageIdImpl(Message<?> message) {
        MessageIdImpl messageIdImpl = (MessageIdImpl) message.getMessageId();
        if (messageIdImpl instanceof BatchMessageIdImpl) {
            messageIdImpl = new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), getPartitionIndex());
        }
        return messageIdImpl;
    }

    private boolean isMessageUndecryptable(MessageMetadata messageMetadata) {
        return messageMetadata.getEncryptionKeysCount() > 0 && this.conf.getCryptoKeyReader() == null && this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME;
    }

    private Optional<EncryptionContext> createEncryptionContext(MessageMetadata messageMetadata) {
        EncryptionContext encryptionContext = null;
        if (messageMetadata.getEncryptionKeysCount() > 0) {
            encryptionContext = new EncryptionContext();
            Map<String, EncryptionContext.EncryptionKey> map = (Map) messageMetadata.getEncryptionKeysList().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, encryptionKeys -> {
                return new EncryptionContext.EncryptionKey(encryptionKeys.getValue(), (Map) encryptionKeys.getMetadatasList().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                })));
            }));
            byte[] encryptionParam = messageMetadata.getEncryptionParam();
            Optional<Integer> ofNullable = Optional.ofNullable(messageMetadata.hasNumMessagesInBatch() ? Integer.valueOf(messageMetadata.getNumMessagesInBatch()) : null);
            encryptionContext.setKeys(map);
            encryptionContext.setParam(encryptionParam);
            if (messageMetadata.hasEncryptionAlgo()) {
                encryptionContext.setAlgorithm(messageMetadata.getEncryptionAlgo());
            }
            encryptionContext.setCompressionType(CompressionCodecProvider.convertFromWireProtocol(messageMetadata.getCompression()));
            encryptionContext.setUncompressedMessageSize(messageMetadata.getUncompressedSize());
            encryptionContext.setBatchSize(ofNullable);
        }
        return Optional.ofNullable(encryptionContext);
    }

    private int removeExpiredMessagesFromQueue(Set<MessageId> set) {
        int i = 0;
        Message<T> peek = this.incomingMessages.peek();
        if (peek != null) {
            if (!set.contains(getMessageIdImpl(peek))) {
                return 0;
            }
            Message<T> poll = this.incomingMessages.poll();
            while (true) {
                Message<?> message = poll;
                if (message == null) {
                    break;
                }
                decreaseIncomingMessageSize(message);
                i++;
                MessageIdImpl messageIdImpl = getMessageIdImpl(message);
                if (!set.contains(messageIdImpl)) {
                    set.add(messageIdImpl);
                    break;
                }
                message.release();
                poll = this.incomingMessages.poll();
            }
        }
        return i;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public ConsumerStatsRecorder getStats() {
        return this.stats;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTerminated() {
        log.info("[{}] [{}] [{}] Consumer has reached the end of topic", this.subscription, this.topic, this.consumerName);
        this.hasReachedEndOfTopic = true;
        if (this.listener != null) {
            this.listener.reachedEndOfTopic(this);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public boolean hasReachedEndOfTopic() {
        return this.hasReachedEndOfTopic;
    }

    public int hashCode() {
        return Objects.hash(this.topic, this.subscription, this.consumerName);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return (obj instanceof ConsumerImpl) && this.consumerId == ((ConsumerImpl) obj).consumerId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void resetBackoff() {
        this.connectionHandler.resetBackoff();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connectionClosed(ClientCnx clientCnx) {
        this.connectionHandler.connectionClosed(clientCnx);
    }

    @VisibleForTesting
    public ClientCnx getClientCnx() {
        return this.connectionHandler.cnx();
    }

    void setClientCnx(ClientCnx clientCnx) {
        if (clientCnx != null) {
            this.connectionHandler.setClientCnx(clientCnx);
            clientCnx.registerConsumer(this.consumerId, this);
            if (this.conf.isAckReceiptEnabled() && !Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion())) {
                log.warn("Server don't support ack for receipt! ProtoVersion >=17 support! nowVersion : {}", Integer.valueOf(clientCnx.getRemoteEndpointProtocolVersion()));
            }
        }
        ClientCnx andSet = this.clientCnxUsedForConsumerRegistration.getAndSet(clientCnx);
        if (andSet == null || andSet == clientCnx) {
            return;
        }
        andSet.removeConsumer(this.consumerId);
    }

    void deregisterFromClientCnx() {
        setClientCnx(null);
    }

    void reconnectLater(Throwable th) {
        this.connectionHandler.reconnectLater(th);
    }

    void grabCnx() {
        this.connectionHandler.grabCnx();
    }

    public String getTopicNameWithoutPartition() {
        return this.topicNameWithoutPartition;
    }

    private void removeOldestPendingChunkedMessage() {
        ChunkedMessageCtx chunkedMessageCtx = null;
        String str = null;
        while (chunkedMessageCtx == null && !this.pendingChunkedMessageUuidQueue.isEmpty()) {
            str = this.pendingChunkedMessageUuidQueue.poll();
            chunkedMessageCtx = StringUtils.isNotBlank(str) ? this.chunkedMessagesMap.get(str) : null;
        }
        removeChunkMessage(str, chunkedMessageCtx, this.autoAckOldestChunkedMessageOnQueueFull);
    }

    protected void removeExpireIncompleteChunkedMessages() {
        if (this.expireTimeOfIncompleteChunkedMessageMillis <= 0) {
            return;
        }
        while (true) {
            String peek = this.pendingChunkedMessageUuidQueue.peek();
            if (peek == null) {
                return;
            }
            ChunkedMessageCtx chunkedMessageCtx = StringUtils.isNotBlank(peek) ? this.chunkedMessagesMap.get(peek) : null;
            if (chunkedMessageCtx == null || System.currentTimeMillis() <= chunkedMessageCtx.receivedTime + this.expireTimeOfIncompleteChunkedMessageMillis) {
                return;
            }
            this.pendingChunkedMessageUuidQueue.remove(peek);
            removeChunkMessage(peek, chunkedMessageCtx, true);
        }
    }

    private void removeChunkMessage(String str, ChunkedMessageCtx chunkedMessageCtx, boolean z) {
        if (chunkedMessageCtx == null) {
            return;
        }
        this.chunkedMessagesMap.remove(str);
        if (chunkedMessageCtx.chunkedMessageIds != null) {
            for (MessageIdImpl messageIdImpl : chunkedMessageCtx.chunkedMessageIds) {
                if (messageIdImpl != null) {
                    if (z) {
                        log.info("Removing chunk message-id {}", messageIdImpl);
                        doAcknowledge(messageIdImpl, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl) null);
                    } else {
                        trackMessage(messageIdImpl);
                    }
                }
            }
        }
        if (chunkedMessageCtx.chunkedMsgBuffer != null) {
            chunkedMessageCtx.chunkedMsgBuffer.release();
        }
        chunkedMessageCtx.recycle();
        this.pendingChunkedMessageCount--;
    }

    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> map, TxnID txnID) {
        ByteBuf newAck;
        long newRequestId = this.client.newRequestId();
        if (messageId instanceof BatchMessageIdImpl) {
            BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageId;
            BitSetRecyclable create = BitSetRecyclable.create();
            long ledgerId = batchMessageIdImpl.getLedgerId();
            long entryId = batchMessageIdImpl.getEntryId();
            if (ackType == CommandAck.AckType.Cumulative) {
                batchMessageIdImpl.ackCumulative();
                create.set(0, batchMessageIdImpl.getBatchSize());
                create.clear(0, batchMessageIdImpl.getBatchIndex() + 1);
            } else {
                create.set(0, batchMessageIdImpl.getBatchSize());
                create.clear(batchMessageIdImpl.getBatchIndex());
            }
            newAck = Commands.newAck(this.consumerId, ledgerId, entryId, create, ackType, validationError, map, txnID.getLeastSigBits(), txnID.getMostSigBits(), newRequestId, batchMessageIdImpl.getBatchSize());
            create.recycle();
        } else {
            MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
            newAck = Commands.newAck(this.consumerId, messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), null, ackType, validationError, map, txnID.getLeastSigBits(), txnID.getMostSigBits(), newRequestId);
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            this.unAckedMessageTracker.removeMessagesTill(messageId);
        } else {
            this.unAckedMessageTracker.remove(messageId);
        }
        return cnx().newAckForReceipt(newAck, newRequestId);
    }

    public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {
        return this.possibleSendToDeadLetterTopicMessages;
    }
}
