package org.apache.pulsar.client.impl;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.9.0-rc-202107112205.jar:org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.class */
public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ZeroQueueConsumerImpl.class);
    private final Lock zeroQueueLock;
    private volatile boolean waitingOnReceiveForZeroQueueSize;
    private volatile boolean waitingOnListenerForZeroQueueSize;

    public ZeroQueueConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, ConsumerConfigurationData<T> consumerConfigurationData, ExecutorProvider executorProvider, int i, boolean z, CompletableFuture<Consumer<T>> completableFuture, MessageId messageId, Schema<T> schema, ConsumerInterceptors<T> consumerInterceptors, boolean z2) {
        super(pulsarClientImpl, str, consumerConfigurationData, executorProvider, i, z, completableFuture, messageId, 0L, schema, consumerInterceptors, z2);
        this.zeroQueueLock = new ReentrantLock();
        this.waitingOnReceiveForZeroQueueSize = false;
        this.waitingOnListenerForZeroQueueSize = false;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerImpl, org.apache.pulsar.client.impl.ConsumerBase
    protected Message<T> internalReceive() throws PulsarClientException {
        this.zeroQueueLock.lock();
        try {
            Message<T> fetchSingleMessageFromBroker = fetchSingleMessageFromBroker();
            trackMessage((Message<?>) fetchSingleMessageFromBroker);
            return beforeConsume(fetchSingleMessageFromBroker);
        } finally {
            this.zeroQueueLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ConsumerImpl, org.apache.pulsar.client.impl.ConsumerBase
    public CompletableFuture<Message<T>> internalReceiveAsync() {
        CompletableFuture<Message<T>> internalReceiveAsync = super.internalReceiveAsync();
        if (!internalReceiveAsync.isDone()) {
            increaseAvailablePermits(cnx());
        }
        return internalReceiveAsync;
    }

    private Message<T> fetchSingleMessageFromBroker() throws PulsarClientException {
        if (this.incomingMessages.size() > 0) {
            log.error("The incoming message queue should never be greater than 0 when Queue size is 0");
            this.incomingMessages.forEach((v0) -> {
                v0.release();
            });
            this.incomingMessages.clear();
        }
        try {
            try {
                this.waitingOnReceiveForZeroQueueSize = true;
                synchronized (this) {
                    if (isConnected()) {
                        increaseAvailablePermits(cnx());
                    }
                }
                while (true) {
                    Message<T> take = this.incomingMessages.take();
                    this.lastDequeuedMessageId = take.getMessageId();
                    ClientCnx cnx = ((MessageImpl) take).getCnx();
                    synchronized (this) {
                        if (cnx == cnx()) {
                            this.waitingOnReceiveForZeroQueueSize = false;
                            this.stats.updateNumMsgsReceived(take);
                            this.waitingOnReceiveForZeroQueueSize = false;
                            this.incomingMessages.clear();
                            return take;
                        }
                    }
                }
            } catch (InterruptedException e) {
                this.stats.incrementNumReceiveFailed();
                throw PulsarClientException.unwrap(e);
            }
        } catch (Throwable th) {
            this.waitingOnReceiveForZeroQueueSize = false;
            this.incomingMessages.clear();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ConsumerImpl
    public void consumerIsReconnectedToBroker(ClientCnx clientCnx, int i) {
        super.consumerIsReconnectedToBroker(clientCnx, i);
        if (this.waitingOnReceiveForZeroQueueSize || i > 0 || !(this.listener == null || this.waitingOnListenerForZeroQueueSize)) {
            increaseAvailablePermits(clientCnx);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected boolean canEnqueueMessage(Message<T> message) {
        if (this.listener == null) {
            return true;
        }
        triggerZeroQueueSizeListener(message);
        return false;
    }

    private void triggerZeroQueueSizeListener(Message<T> message) {
        Preconditions.checkNotNull(this.listener, "listener can't be null");
        Preconditions.checkNotNull(message, "unqueued message can't be null");
        this.pinnedExecutor.execute(() -> {
            this.stats.updateNumMsgsReceived(message);
            try {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Calling message listener for unqueued message {}", this.topic, this.subscription, message.getMessageId());
                }
                this.waitingOnListenerForZeroQueueSize = true;
                trackMessage((Message<?>) message);
                this.listener.received(this, beforeConsume(message));
            } catch (Throwable th) {
                log.error("[{}][{}] Message listener error in processing unqueued message: {}", this.topic, this.subscription, message.getMessageId(), th);
            }
            increaseAvailablePermits(cnx());
            this.waitingOnListenerForZeroQueueSize = false;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public void triggerListener() {
    }

    @Override // org.apache.pulsar.client.impl.ConsumerImpl
    void receiveIndividualMessagesFromBatch(MessageMetadata messageMetadata, int i, List<Long> list, ByteBuf byteBuf, MessageIdData messageIdData, ClientCnx clientCnx) {
        log.warn("Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", this.subscription, this.consumerName);
        closeAsync().handle((r11, th) -> {
            notifyPendingReceivedCallback(null, new PulsarClientException.InvalidMessageException(String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", this.subscription, this.consumerName)));
            return null;
        });
    }
}
