package org.apache.inlong.tubemq.client.consumer;

import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.client.common.PeerInfo;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.InnerSessionFactory;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/client/consumer/SimplePushMessageConsumer.class */
public class SimplePushMessageConsumer implements PushMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SimplePushMessageConsumer.class);
    private static final int MAX_FAILURE_LOG_TIMES = 10;
    private final MessageFetchManager fetchManager;
    private final BaseMessageConsumer baseConsumer;
    private AtomicLong lastLogPrintTime = new AtomicLong(0);
    private AtomicLong lastFailureCount = new AtomicLong(0);
    private CountDownLatch consumeSync = new CountDownLatch(0);

    public SimplePushMessageConsumer(InnerSessionFactory innerSessionFactory, ConsumerConfig consumerConfig) throws TubeClientException {
        this.baseConsumer = new BaseMessageConsumer(innerSessionFactory, consumerConfig, false);
        this.fetchManager = new MessageFetchManager(this.baseConsumer.consumerConfig, this);
        this.fetchManager.startFetchWorkers();
    }

    public void shutdown() throws Throwable {
        pauseConsume();
        this.fetchManager.stopFetchWorkers(true);
        ThreadUtils.sleep(200L);
        this.fetchManager.stopFetchWorkers(false);
        this.baseConsumer.shutdown();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.PushMessageConsumer
    public PushMessageConsumer subscribe(String str, TreeSet<String> treeSet, MessageListener messageListener) throws TubeClientException {
        this.baseConsumer.subscribe(str, treeSet, messageListener);
        return this;
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public void completeSubscribe() throws TubeClientException {
        this.baseConsumer.completeSubscribe();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public void completeSubscribe(String str, int i, boolean z, Map<String, Long> map) throws TubeClientException {
        this.baseConsumer.completeSubscribe(str, i, z, map);
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public String getClientVersion() {
        return this.baseConsumer.getClientVersion();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public String getConsumerId() {
        return this.baseConsumer.getConsumerId();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public boolean isShutdown() {
        return this.baseConsumer.isShutdown();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public ConsumerConfig getConsumerConfig() {
        return this.baseConsumer.getConsumerConfig();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public boolean isFilterConsume(String str) {
        return this.baseConsumer.isFilterConsume(str);
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public Map<String, ConsumeOffsetInfo> getCurConsumedPartitions() throws TubeClientException {
        return this.baseConsumer.getCurConsumedPartitions();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public void freezePartitions(List<String> list) throws TubeClientException {
        this.baseConsumer.freezePartitions(list);
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public void unfreezePartitions(List<String> list) throws TubeClientException {
        this.baseConsumer.unfreezePartitions(list);
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public void relAllFrozenPartitions() {
        this.baseConsumer.relAllFrozenPartitions();
    }

    @Override // org.apache.inlong.tubemq.client.consumer.MessageConsumer
    public Map<String, Long> getFrozenPartInfo() {
        return this.baseConsumer.getFrozenPartInfo();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseMessageConsumer getBaseConsumer() {
        return this.baseConsumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void allowConsumeWait() {
        if (this.consumeSync == null || this.consumeSync.getCount() == 0) {
            return;
        }
        try {
            this.consumeSync.await();
        } catch (InterruptedException e) {
        }
    }

    @Override // org.apache.inlong.tubemq.client.consumer.PushMessageConsumer
    public void resumeConsume() {
        this.consumeSync.countDown();
        logger.info(new StringBuilder(256).append("[ResumeConsume] Consume is resume, consumerId :").append(this.baseConsumer.consumerId).toString());
    }

    @Override // org.apache.inlong.tubemq.client.consumer.PushMessageConsumer
    public void pauseConsume() {
        this.consumeSync = new CountDownLatch(1);
        logger.info(new StringBuilder(256).append("[PauseConsume] Consume is paused, consumerId :").append(this.baseConsumer.consumerId).toString());
    }

    @Override // org.apache.inlong.tubemq.client.consumer.PushMessageConsumer
    public boolean isConsumePaused() {
        return (this.consumeSync == null || this.consumeSync.getCount() == 0) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processRequest(PartitionSelectResult partitionSelectResult, StringBuilder sb) {
        long currentTimeMillis = System.currentTimeMillis();
        FetchContext fetchMessage = this.baseConsumer.fetchMessage(partitionSelectResult, sb);
        if (!fetchMessage.isSuccess()) {
            if (logger.isDebugEnabled()) {
                logger.debug(sb.append("Fetch message error: partition:").append(partitionSelectResult.getPartition().toString()).append(" error is ").append(fetchMessage.getErrMsg()).toString());
                sb.delete(0, sb.length());
                return;
            }
            return;
        }
        boolean z = false;
        if (!isShutdown()) {
            if (fetchMessage.getMessageList() == null || fetchMessage.getMessageList().isEmpty()) {
                z = true;
            } else {
                try {
                    TopicProcessor topicProcessor = this.baseConsumer.consumeSubInfo.getTopicProcessor(fetchMessage.getPartition().getTopic());
                    if (topicProcessor == null || topicProcessor.getMessageListener() == null) {
                        throw new TubeClientException(sb.append("Listener is null for topic ").append(fetchMessage.getPartition().getTopic()).toString());
                    }
                    z = notifyListener(fetchMessage, topicProcessor, sb);
                } catch (Throwable th) {
                    z = !this.baseConsumer.consumerConfig.isPushListenerThrowedRollBack();
                    logMessageProcessFailed(fetchMessage, th);
                }
            }
        }
        this.baseConsumer.rmtDataCache.succRspRelease(fetchMessage.getPartition().getPartitionKey(), fetchMessage.getPartition().getTopic(), fetchMessage.getUsedToken(), z, isFilterConsume(fetchMessage.getPartition().getTopic()), fetchMessage.getCurrOffset(), fetchMessage.getMaxOffset());
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis2 > 30000) {
            logger.info(sb.append("Consuming Partition; current processing thread ").append(Thread.currentThread().getName()).append("-->Process[").append(partitionSelectResult.getPartition().toString()).append("] cost:").append(currentTimeMillis2).append(" Ms").toString());
            sb.delete(0, sb.length());
        }
    }

    private boolean notifyListener(final FetchContext fetchContext, final TopicProcessor topicProcessor, StringBuilder sb) throws Exception {
        MessageListener messageListener = topicProcessor.getMessageListener();
        if (messageListener.getExecutor() != null) {
            try {
                messageListener.getExecutor().execute(new Runnable() { // from class: org.apache.inlong.tubemq.client.consumer.SimplePushMessageConsumer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        SimplePushMessageConsumer.this.receiveMessages(fetchContext, topicProcessor);
                    }
                });
            } catch (RejectedExecutionException e) {
                logger.error(new StringBuilder(512).append("MessageListener thread poll is busy, topic=").append(fetchContext.getPartition().getTopic()).append(",partition=").append(fetchContext.getPartition()).toString(), e);
                throw e;
            }
        } else {
            receiveMessages(fetchContext, topicProcessor);
        }
        this.baseConsumer.clientStatsInfo.bookReturnDuration(fetchContext.getPartitionKey(), System.currentTimeMillis() - fetchContext.getUsedToken());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveMessages(FetchContext fetchContext, TopicProcessor topicProcessor) {
        if (fetchContext == null || fetchContext.getMessageList() == null) {
            return;
        }
        try {
            topicProcessor.getMessageListener().receiveMessages(new PeerInfo(fetchContext.getPartition(), fetchContext.getCurrOffset(), fetchContext.getMaxOffset()), fetchContext.getMessageList());
        } catch (InterruptedException e) {
            logger.info("Call listener to process received messages throw Interrupted Exception!");
        }
    }

    private void logMessageProcessFailed(FetchContext fetchContext, Throwable th) {
        StringBuilder sb = new StringBuilder(512);
        sb.append("CallBack process message failed: partition=").append(fetchContext.getPartition());
        sb.append(", group=").append(this.baseConsumer.consumerConfig.getConsumerGroup());
        sb.append(", FetchManager.isConsumePaused=").append(isConsumePaused());
        sb.append(", MessageConsumer.shutdown=").append(isShutdown());
        if (isShutdown()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j = this.lastLogPrintTime.get();
        if (this.lastFailureCount.incrementAndGet() <= 10 || j <= 0 || currentTimeMillis - j > 30000) {
            logger.warn(sb.toString(), th);
            if (currentTimeMillis - j > 30000) {
                this.lastLogPrintTime.set(currentTimeMillis);
                this.lastFailureCount.set(0L);
            }
        }
    }
}
