package org.apache.pulsar.client.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import io.netty.util.Timeout;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.10.0-rc-202112052205.jar:org/apache/pulsar/client/impl/ConsumerBase.class */
public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
    protected final String subscription;
    protected final ConsumerConfigurationData<T> conf;
    protected final String consumerName;
    protected final CompletableFuture<Consumer<T>> subscribeFuture;
    protected final MessageListener<T> listener;
    protected final ConsumerEventListener consumerEventListener;
    protected final ExecutorProvider executorProvider;
    protected final ScheduledExecutorService externalPinnedExecutor;
    protected final ScheduledExecutorService internalPinnedExecutor;
    final BlockingQueue<Message<T>> incomingMessages;
    protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
    protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
    protected int maxReceiverQueueSize;
    protected final Schema<T> schema;
    protected final ConsumerInterceptors<T> interceptors;
    protected final BatchReceivePolicy batchReceivePolicy;
    protected final ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
    protected volatile long incomingMessagesSize;
    protected volatile Timeout batchReceiveTimeout;
    protected final Lock reentrantLock;
    private final AtomicInteger executorQueueSize;
    private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "incomingMessagesSize");
    static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerBase.class);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.10.0-rc-202112052205.jar:org/apache/pulsar/client/impl/ConsumerBase$OpBatchReceive.class */
    public static final class OpBatchReceive<T> {
        final CompletableFuture<Messages<T>> future;
        final long createdAt = System.nanoTime();

        private OpBatchReceive(CompletableFuture<Messages<T>> completableFuture) {
            this.future = completableFuture;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static <T> OpBatchReceive<T> of(CompletableFuture<Messages<T>> completableFuture) {
            return new OpBatchReceive<>(completableFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerBase(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, int i, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> completableFuture, Schema<T> schema, ConsumerInterceptors consumerInterceptors) {
        super(pulsarClientImpl, str);
        this.incomingMessagesSize = 0L;
        this.batchReceiveTimeout = null;
        this.reentrantLock = new ReentrantLock();
        this.executorQueueSize = new AtomicInteger(0);
        this.maxReceiverQueueSize = i;
        this.subscription = consumerConfigurationData.getSubscriptionName();
        this.conf = consumerConfigurationData;
        this.consumerName = consumerConfigurationData.getConsumerName() == null ? ConsumerName.generateRandomName() : consumerConfigurationData.getConsumerName();
        this.subscribeFuture = completableFuture;
        this.listener = consumerConfigurationData.getMessageListener();
        this.consumerEventListener = consumerConfigurationData.getConsumerEventListener();
        this.incomingMessages = new GrowableArrayBlockingQueue();
        this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
        this.executorProvider = executorProvider;
        this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
        this.internalPinnedExecutor = (ScheduledExecutorService) pulsarClientImpl.getInternalExecutorService();
        this.pendingReceives = Queues.newConcurrentLinkedQueue();
        this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
        this.schema = schema;
        this.interceptors = consumerInterceptors;
        if (consumerConfigurationData.getBatchReceivePolicy() != null) {
            BatchReceivePolicy batchReceivePolicy = consumerConfigurationData.getBatchReceivePolicy();
            if (batchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
                this.batchReceivePolicy = BatchReceivePolicy.builder().maxNumMessages(this.maxReceiverQueueSize).maxNumBytes(batchReceivePolicy.getMaxNumBytes()).timeout((int) batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS).build();
                log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, reset to maxReceiverQueueSize. batchReceivePolicy: {}", Integer.valueOf(batchReceivePolicy.getMaxNumMessages()), Integer.valueOf(this.maxReceiverQueueSize), this.batchReceivePolicy.toString());
            } else if (batchReceivePolicy.getMaxNumMessages() > 0 || batchReceivePolicy.getMaxNumBytes() > 0) {
                this.batchReceivePolicy = consumerConfigurationData.getBatchReceivePolicy();
            } else {
                this.batchReceivePolicy = BatchReceivePolicy.builder().maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages()).maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes()).timeout((int) batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS).build();
                log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. Reset to DEFAULT_POLICY. batchReceivePolicy: {}", Integer.valueOf(batchReceivePolicy.getMaxNumMessages()), Integer.valueOf(batchReceivePolicy.getMaxNumBytes()), this.batchReceivePolicy.toString());
            }
        } else {
            this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
        }
        if (this.batchReceivePolicy.getTimeoutMs() > 0) {
            this.batchReceiveTimeout = pulsarClientImpl.timer().newTimeout(this::pendingBatchReceiveTask, this.batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public Message<T> receive() throws PulsarClientException {
        if (this.listener != null) {
            throw new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set");
        }
        verifyConsumerState();
        return internalReceive();
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Message<T>> receiveAsync() {
        if (this.listener != null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set"));
        }
        try {
            verifyConsumerState();
            return internalReceiveAsync();
        } catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    protected abstract Message<T> internalReceive() throws PulsarClientException;

    protected abstract CompletableFuture<Message<T>> internalReceiveAsync();

    @Override // org.apache.pulsar.client.api.Consumer
    public Message<T> receive(int i, TimeUnit timeUnit) throws PulsarClientException {
        if (this.conf.getReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException("Can't use receive with timeout, if the queue size is 0");
        }
        if (this.listener != null) {
            throw new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set");
        }
        verifyConsumerState();
        return internalReceive(i, timeUnit);
    }

    protected abstract Message<T> internalReceive(int i, TimeUnit timeUnit) throws PulsarClientException;

    @Override // org.apache.pulsar.client.api.Consumer
    public Messages<T> batchReceive() throws PulsarClientException {
        verifyBatchReceive();
        verifyConsumerState();
        return internalBatchReceive();
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Messages<T>> batchReceiveAsync() {
        try {
            verifyBatchReceive();
            verifyConsumerState();
            return internalBatchReceiveAsync();
        } catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasNextPendingReceive() {
        return !this.pendingReceives.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Message<T>> nextPendingReceive() {
        CompletableFuture<Message<T>> poll;
        do {
            poll = this.pendingReceives.poll();
            if (poll == null) {
                break;
            }
        } while (poll.isDone());
        return poll;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void completePendingReceive(CompletableFuture<Message<T>> completableFuture, Message<T> message) {
        getInternalExecutor(message).execute(() -> {
            if (completableFuture.complete(message)) {
                return;
            }
            log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}", Boolean.valueOf(completableFuture.isCancelled()), message);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> failPendingReceive() {
        if (this.internalPinnedExecutor.isShutdown()) {
            failPendingReceives();
            failPendingBatchReceives();
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.internalPinnedExecutor.execute(() -> {
            try {
                failPendingReceives();
                failPendingBatchReceives();
            } finally {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    private void failPendingReceives() {
        CompletableFuture<Message<T>> poll;
        while (!this.pendingReceives.isEmpty() && (poll = this.pendingReceives.poll()) != null) {
            if (!poll.isDone()) {
                poll.completeExceptionally(new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s was already closed when cleaning and closing the consumers", this.topic, this.subscription)));
            }
        }
    }

    private void failPendingBatchReceives() {
        OpBatchReceive<T> nextBatchReceive;
        while (hasNextBatchReceive() && (nextBatchReceive = nextBatchReceive()) != null && nextBatchReceive.future != null) {
            if (!nextBatchReceive.future.isDone()) {
                nextBatchReceive.future.completeExceptionally(new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s was already closed when cleaning and closing the consumers", this.topic, this.subscription)));
            }
        }
    }

    protected abstract Messages<T> internalBatchReceive() throws PulsarClientException;

    protected abstract CompletableFuture<Messages<T>> internalBatchReceiveAsync();

    private static void validateMessageId(Message<?> message) throws PulsarClientException {
        if (message == null) {
            throw new PulsarClientException.InvalidMessageException("Non-null message is required");
        }
        if (message.getMessageId() == null) {
            throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
        }
    }

    private static void validateMessageId(MessageId messageId) throws PulsarClientException {
        if (messageId == null) {
            throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void acknowledge(Message<?> message) throws PulsarClientException {
        validateMessageId(message);
        acknowledge(message.getMessageId());
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void acknowledge(MessageId messageId) throws PulsarClientException {
        validateMessageId(messageId);
        try {
            acknowledgeAsync(messageId).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void acknowledge(List<MessageId> list) throws PulsarClientException {
        try {
            acknowledgeAsync(list).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void acknowledge(Messages<?> messages) throws PulsarClientException {
        try {
            acknowledgeAsync(messages).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void reconsumeLater(Message<?> message, long j, TimeUnit timeUnit) throws PulsarClientException {
        if (!this.conf.isRetryEnable()) {
            throw new PulsarClientException("reconsumeLater method not support!");
        }
        try {
            reconsumeLaterAsync(message, j, timeUnit).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void reconsumeLater(Messages<?> messages, long j, TimeUnit timeUnit) throws PulsarClientException {
        try {
            reconsumeLaterAsync(messages, j, timeUnit).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
        validateMessageId(message);
        acknowledgeCumulative(message.getMessageId());
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
        validateMessageId(messageId);
        try {
            acknowledgeCumulativeAsync(messageId).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void reconsumeLaterCumulative(Message<?> message, long j, TimeUnit timeUnit) throws PulsarClientException {
        try {
            reconsumeLaterCumulativeAsync(message, j, timeUnit).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
        try {
            validateMessageId(message);
            return acknowledgeAsync(message.getMessageId());
        } catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
        ArrayList arrayList = new ArrayList(messages.size());
        Iterator<?> it = messages.iterator();
        while (it.hasNext()) {
            Message message = (Message) it.next();
            try {
                validateMessageId((Message<?>) message);
                arrayList.add(message.getMessageId());
            } catch (PulsarClientException e) {
                return FutureUtil.failedFuture(e);
            }
        }
        return acknowledgeAsync(arrayList);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeAsync(List<MessageId> list) {
        return doAcknowledgeWithTxn(list, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl) null);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long j, TimeUnit timeUnit) {
        if (!this.conf.isRetryEnable()) {
            return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
        }
        try {
            validateMessageId(message);
            return doReconsumeLater(message, CommandAck.AckType.Individual, Collections.emptyMap(), j, timeUnit);
        } catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long j, TimeUnit timeUnit) {
        Iterator<?> it = messages.iterator();
        while (it.hasNext()) {
            try {
                validateMessageId((Message<?>) it.next());
            } catch (PulsarClientException e) {
                return FutureUtil.failedFuture(e);
            }
        }
        messages.forEach(message -> {
            reconsumeLaterAsync((Message<?>) message, j, timeUnit);
        });
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
        try {
            validateMessageId(message);
            return acknowledgeCumulativeAsync(message.getMessageId());
        } catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long j, TimeUnit timeUnit) {
        return !this.conf.isRetryEnable() ? FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!")) : !isCumulativeAcknowledgementAllowed(this.conf.getSubscriptionType()) ? FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Cannot use cumulative acks on a non-exclusive subscription")) : doReconsumeLater(message, CommandAck.AckType.Cumulative, Collections.emptyMap(), j, timeUnit);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
        return acknowledgeAsync(messageId, null);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction transaction) {
        TransactionImpl transactionImpl = null;
        if (null != transaction) {
            Preconditions.checkArgument(transaction instanceof TransactionImpl);
            transactionImpl = (TransactionImpl) transaction;
        }
        return doAcknowledgeWithTxn(messageId, CommandAck.AckType.Individual, Collections.emptyMap(), transactionImpl);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
        return acknowledgeCumulativeAsync(messageId, null);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction transaction) {
        if (!isCumulativeAcknowledgementAllowed(this.conf.getSubscriptionType())) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Cannot use cumulative acks on a non-exclusive/non-failover subscription"));
        }
        TransactionImpl transactionImpl = null;
        if (null != transaction) {
            Preconditions.checkArgument(transaction instanceof TransactionImpl);
            transactionImpl = (TransactionImpl) transaction;
        }
        return doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, Collections.emptyMap(), transactionImpl);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void negativeAcknowledge(Message<?> message) {
        negativeAcknowledge(message.getMessageId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> list, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl) {
        CompletableFuture<Void> doAcknowledge;
        if (transactionImpl != null) {
            doAcknowledge = transactionImpl.registerAckedTopic(getTopic(), this.subscription).thenCompose(r11 -> {
                return doAcknowledge((List<MessageId>) list, ackType, (Map<String, Long>) map, transactionImpl);
            });
            transactionImpl.registerAckOp(doAcknowledge);
        } else {
            doAcknowledge = doAcknowledge(list, ackType, map, (TransactionImpl) null);
        }
        return doAcknowledge;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl) {
        if (transactionImpl == 0 || !(this instanceof ConsumerImpl)) {
            return doAcknowledge(messageId, ackType, map, transactionImpl);
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            transactionImpl.registerCumulativeAckConsumer((ConsumerImpl) this);
        }
        CompletableFuture thenCompose = transactionImpl.registerAckedTopic(getTopic(), this.subscription).thenCompose(r11 -> {
            return doAcknowledge(messageId, ackType, (Map<String, Long>) map, transactionImpl);
        });
        transactionImpl.registerAckOp(thenCompose);
        return thenCompose;
    }

    protected abstract CompletableFuture<Void> doAcknowledge(MessageId messageId, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl);

    protected abstract CompletableFuture<Void> doAcknowledge(List<MessageId> list, CommandAck.AckType ackType, Map<String, Long> map, TransactionImpl transactionImpl);

    protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, CommandAck.AckType ackType, Map<String, Long> map, long j, TimeUnit timeUnit);

    @Override // org.apache.pulsar.client.api.Consumer
    public void negativeAcknowledge(Messages<?> messages) {
        messages.forEach(this::negativeAcknowledge);
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void unsubscribe() throws PulsarClientException {
        try {
            unsubscribeAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public abstract CompletableFuture<Void> unsubscribeAsync();

    @Override // org.apache.pulsar.client.api.Consumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws PulsarClientException {
        try {
            closeAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public abstract CompletableFuture<Void> closeAsync();

    @Override // org.apache.pulsar.client.api.Consumer
    public MessageId getLastMessageId() throws PulsarClientException {
        try {
            return getLastMessageIdAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap(e);
        } catch (ExecutionException e2) {
            throw PulsarClientException.unwrap(e2);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public abstract CompletableFuture<MessageId> getLastMessageIdAsync();

    private boolean isCumulativeAcknowledgementAllowed(SubscriptionType subscriptionType) {
        return (SubscriptionType.Shared == subscriptionType || SubscriptionType.Key_Shared == subscriptionType) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CommandSubscribe.SubType getSubType() {
        switch (this.conf.getSubscriptionType()) {
            case Exclusive:
                return CommandSubscribe.SubType.Exclusive;
            case Shared:
                return CommandSubscribe.SubType.Shared;
            case Failover:
                return CommandSubscribe.SubType.Failover;
            case Key_Shared:
                return CommandSubscribe.SubType.Key_Shared;
            default:
                return null;
        }
    }

    public abstract int getAvailablePermits();

    public abstract int numMessagesInQueue();

    public CompletableFuture<Consumer<T>> subscribeFuture() {
        return this.subscribeFuture;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public String getTopic() {
        return this.topic;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public String getSubscription() {
        return this.subscription;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public String getConsumerName() {
        return this.consumerName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void redeliverUnacknowledgedMessages(Set<MessageId> set);

    public String toString() {
        return "ConsumerBase{subscription='" + this.subscription + "', consumerName='" + this.consumerName + "', topic='" + this.topic + "'}";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setMaxReceiverQueueSize(int i) {
        this.maxReceiverQueueSize = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message<T> beforeConsume(Message<T> message) {
        return this.interceptors != null ? this.interceptors.beforeConsume(this, message) : message;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onAcknowledge(MessageId messageId, Throwable th) {
        if (this.interceptors != null) {
            this.interceptors.onAcknowledge(this, messageId, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onAcknowledgeCumulative(MessageId messageId, Throwable th) {
        if (this.interceptors != null) {
            this.interceptors.onAcknowledgeCumulative(this, messageId, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onNegativeAcksSend(Set<MessageId> set) {
        if (this.interceptors != null) {
            this.interceptors.onNegativeAcksSend(this, set);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onAckTimeoutSend(Set<MessageId> set) {
        if (this.interceptors != null) {
            this.interceptors.onAckTimeoutSend(this, set);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onPartitionsChange(String str, int i) {
        if (this.interceptors != null) {
            this.interceptors.onPartitionsChange(str, i);
        }
    }

    protected boolean canEnqueueMessage(Message<T> message) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
        int size = message.size();
        if (canEnqueueMessage(message) && this.incomingMessages.offer(message)) {
            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, size);
        }
        return hasEnoughMessagesForBatchReceive();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasEnoughMessagesForBatchReceive() {
        if (this.batchReceivePolicy.getMaxNumMessages() > 0 || this.batchReceivePolicy.getMaxNumBytes() > 0) {
            return (this.batchReceivePolicy.getMaxNumMessages() > 0 && this.incomingMessages.size() >= this.batchReceivePolicy.getMaxNumMessages()) || (this.batchReceivePolicy.getMaxNumBytes() > 0 && getIncomingMessageSize() >= ((long) this.batchReceivePolicy.getMaxNumBytes()));
        }
        return false;
    }

    private void verifyConsumerState() throws PulsarClientException {
        switch (getState()) {
            case Ready:
            case Connecting:
            default:
                return;
            case Closing:
            case Closed:
                throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
            case Terminated:
                throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
            case Failed:
            case Uninitialized:
                throw new PulsarClientException.NotConnectedException();
        }
    }

    private void verifyBatchReceive() throws PulsarClientException {
        if (this.listener != null) {
            throw new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set");
        }
        if (this.conf.getReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException("Can't use batch receive, if the queue size is 0");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyPendingBatchReceivedCallBack() {
        OpBatchReceive<T> nextBatchReceive = nextBatchReceive();
        if (nextBatchReceive == null) {
            return;
        }
        this.reentrantLock.lock();
        try {
            notifyPendingBatchReceivedCallBack(nextBatchReceive);
        } finally {
            this.reentrantLock.unlock();
        }
    }

    private boolean hasNextBatchReceive() {
        return !this.pendingBatchReceives.isEmpty();
    }

    private OpBatchReceive<T> nextBatchReceive() {
        OpBatchReceive<T> opBatchReceive = null;
        while (opBatchReceive == null) {
            opBatchReceive = this.pendingBatchReceives.poll();
            if (opBatchReceive == null) {
                return null;
            }
            if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
                opBatchReceive = null;
            }
        }
        return opBatchReceive;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
        MessagesImpl<T> newMessagesImpl = getNewMessagesImpl();
        Message<T> peek = this.incomingMessages.peek();
        while (true) {
            Message<T> message = peek;
            if (message == null || !newMessagesImpl.canAdd(message)) {
                break;
            }
            Message<T> poll = this.incomingMessages.poll();
            if (poll != null) {
                messageProcessed(poll);
                newMessagesImpl.add(beforeConsume(poll));
            }
            peek = this.incomingMessages.peek();
        }
        completePendingBatchReceive(opBatchReceive.future, newMessagesImpl);
    }

    protected void completePendingBatchReceive(CompletableFuture<Messages<T>> completableFuture, Messages<T> messages) {
        if (completableFuture.complete(messages)) {
            return;
        }
        log.warn("Race condition detected. batch receive future was already completed (cancelled={}) and messages were dropped. messages={}", Boolean.valueOf(completableFuture.isCancelled()), messages);
    }

    protected abstract void messageProcessed(Message<?> message);

    private void pendingBatchReceiveTask(Timeout timeout) {
        this.internalPinnedExecutor.execute(() -> {
            doPendingBatchReceiveTask(timeout);
        });
    }

    private void doPendingBatchReceiveTask(Timeout timeout) {
        if (timeout.isCancelled()) {
            return;
        }
        synchronized (this) {
            if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
                return;
            }
            long timeoutMs = this.batchReceivePolicy.getTimeoutMs();
            OpBatchReceive<T> peek = this.pendingBatchReceives.peek();
            while (true) {
                if (peek == null) {
                    break;
                }
                long timeoutMs2 = this.batchReceivePolicy.getTimeoutMs() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - peek.createdAt);
                if (timeoutMs2 > 0) {
                    timeoutMs = timeoutMs2;
                    break;
                }
                completeOpBatchReceive(peek);
                OpBatchReceive<T> poll = this.pendingBatchReceives.poll();
                if (poll != peek) {
                    log.error("Race condition in consumer {} (should not cause data loss).  Concurrent operations on pendingBatchReceives is not safe", this.consumerName);
                    if (poll != null && !poll.future.isDone()) {
                        completeOpBatchReceive(poll);
                    }
                }
                peek = this.pendingBatchReceives.peek();
            }
            this.batchReceiveTimeout = this.client.timer().newTimeout(this::pendingBatchReceiveTask, timeoutMs, TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void triggerListener() {
        Message<T> internalReceive;
        try {
            if (this.executorQueueSize.get() < 1 && (internalReceive = internalReceive(0, TimeUnit.MILLISECONDS)) != null) {
                this.executorQueueSize.incrementAndGet();
                if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) {
                    this.executorProvider.getExecutor(peekMessageKey(internalReceive)).execute(() -> {
                        callMessageListener(internalReceive);
                    });
                } else {
                    getExternalExecutor(internalReceive).execute(() -> {
                        callMessageListener(internalReceive);
                    });
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Message has been cleared from the queue", this.topic, this.subscription);
            }
        } catch (PulsarClientException e) {
            log.warn("[{}] [{}] Failed to dequeue the message for listener", this.topic, this.subscription, e);
        }
    }

    protected void callMessageListener(Message<T> message) {
        try {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Calling message listener for message {}", this.topic, this.subscription, message.getMessageId());
            }
            this.listener.received(this, message);
        } catch (Throwable th) {
            log.error("[{}][{}] Message listener error in processing message: {}", this.topic, this.subscription, message.getMessageId(), th);
        } finally {
            this.executorQueueSize.decrementAndGet();
            triggerListener();
        }
    }

    protected byte[] peekMessageKey(Message<T> message) {
        byte[] bArr = NONE_KEY;
        if (message.hasKey()) {
            bArr = message.getKeyBytes();
        }
        if (message.hasOrderingKey()) {
            bArr = message.getOrderingKey();
        }
        return bArr;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessagesImpl<T> getNewMessagesImpl() {
        return new MessagesImpl<>(this.batchReceivePolicy.getMaxNumMessages(), this.batchReceivePolicy.getMaxNumBytes());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasPendingBatchReceive() {
        return this.pendingBatchReceives != null && hasNextBatchReceive();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetIncomingMessageSize() {
        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void decreaseIncomingMessageSize(Message<?> message) {
        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
    }

    public long getIncomingMessageSize() {
        return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
    }

    public int getTotalIncomingMessages() {
        return this.incomingMessages.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clearIncomingMessages() {
        this.incomingMessages.forEach((v0) -> {
            v0.release();
        });
        this.incomingMessages.clear();
        resetIncomingMessageSize();
    }

    protected abstract void completeOpBatchReceive(OpBatchReceive<T> opBatchReceive);

    private ExecutorService getExternalExecutor(Message<T> message) {
        ConsumerImpl consumerImpl = message instanceof TopicMessageImpl ? ((TopicMessageImpl) message).receivedByconsumer : null;
        return (consumerImpl == null || consumerImpl.externalPinnedExecutor == null) ? this.externalPinnedExecutor : consumerImpl.externalPinnedExecutor;
    }

    private ExecutorService getInternalExecutor(Message<T> message) {
        ConsumerImpl consumerImpl = message instanceof TopicMessageImpl ? ((TopicMessageImpl) message).receivedByconsumer : null;
        return (consumerImpl == null || consumerImpl.internalPinnedExecutor == null) ? this.internalPinnedExecutor : consumerImpl.internalPinnedExecutor;
    }

    @Override // org.apache.pulsar.client.impl.HandlerState
    public /* bridge */ /* synthetic */ PulsarClientImpl getClient() {
        return super.getClient();
    }
}
