package org.apache.flink.streaming.connectors.pulsar;

import java.io.IOException;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.IOUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.class */
class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class);
    private final int messageReceiveTimeoutMs = 100;
    private final String serviceUrl;
    private final Set<String> topicNames;
    private final Pattern topicsPattern;
    private final String subscriptionName;
    private final DeserializationSchema<T> deserializer;
    private PulsarClient client;
    private Consumer<byte[]> consumer;
    private boolean isCheckpointingEnabled;
    private final long acknowledgementBatchSize;
    private long batchCount;
    private volatile transient boolean isRunning;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarConsumerSource(PulsarSourceBuilder<T> pulsarSourceBuilder) {
        super(MessageId.class);
        this.messageReceiveTimeoutMs = 100;
        this.serviceUrl = pulsarSourceBuilder.serviceUrl;
        this.topicNames = pulsarSourceBuilder.topicNames;
        this.topicsPattern = pulsarSourceBuilder.topicsPattern;
        this.deserializer = pulsarSourceBuilder.deserializationSchema;
        this.subscriptionName = pulsarSourceBuilder.subscriptionName;
        this.acknowledgementBatchSize = pulsarSourceBuilder.acknowledgementBatchSize;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        if (runtimeContext instanceof StreamingRuntimeContext) {
            this.isCheckpointingEnabled = runtimeContext.isCheckpointingEnabled();
        }
        this.client = createClient();
        this.consumer = createConsumer(this.client);
        this.isRunning = true;
    }

    protected void acknowledgeIDs(long j, Set<MessageId> set) {
        if (this.consumer == null) {
            LOG.error("null consumer unable to acknowledge messages");
            throw new RuntimeException("null pulsar consumer unable to acknowledge messages");
        }
        if (set.isEmpty()) {
            LOG.info("no message ids to acknowledge");
            return;
        }
        HashMap hashMap = new HashMap(set.size());
        for (MessageId messageId : set) {
            hashMap.put(messageId.toString(), this.consumer.acknowledgeAsync(messageId));
        }
        hashMap.forEach((str, completableFuture) -> {
            try {
                completableFuture.get();
            } catch (Exception e) {
                LOG.error("failed to acknowledge messageId " + str, e);
                throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
            }
        });
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        while (this.isRunning) {
            Message receive = this.consumer.receive(100, TimeUnit.MILLISECONDS);
            if (receive != null) {
                if (this.isCheckpointingEnabled) {
                    emitCheckpointing(sourceContext, receive);
                } else {
                    emitAutoAcking(sourceContext, receive);
                }
            }
        }
    }

    private void emitCheckpointing(SourceFunction.SourceContext<T> sourceContext, Message message) throws IOException {
        synchronized (sourceContext.getCheckpointLock()) {
            if (addId(message.getMessageId())) {
                sourceContext.collect(deserialize(message));
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("messageId=" + message.getMessageId().toString() + " already processed.");
                }
            }
        }
    }

    private void emitAutoAcking(SourceFunction.SourceContext<T> sourceContext, Message message) throws IOException {
        sourceContext.collect(deserialize(message));
        this.batchCount++;
        if (this.batchCount >= this.acknowledgementBatchSize) {
            LOG.info("processed {} messages acknowledging messageId {}", Long.valueOf(this.batchCount), message.getMessageId());
            this.consumer.acknowledgeCumulative(message.getMessageId());
            this.batchCount = 0L;
        }
    }

    private T deserialize(Message message) throws IOException {
        return (T) this.deserializer.deserialize(message.getData());
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void close() throws Exception {
        super.close();
        IOUtils.cleanup(LOG, new AutoCloseable[]{this.consumer});
        IOUtils.cleanup(LOG, new AutoCloseable[]{this.client});
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    boolean isCheckpointingEnabled() {
        return this.isCheckpointingEnabled;
    }

    PulsarClient createClient() throws PulsarClientException {
        return PulsarClient.builder().serviceUrl(this.serviceUrl).build();
    }

    Consumer<byte[]> createConsumer(PulsarClient pulsarClient) throws PulsarClientException {
        return this.topicsPattern != null ? pulsarClient.newConsumer().topicsPattern(this.topicsPattern).subscriptionName(this.subscriptionName).subscriptionType(SubscriptionType.Failover).subscribe() : pulsarClient.newConsumer().topics(Lists.newArrayList(this.topicNames)).subscriptionName(this.subscriptionName).subscriptionType(SubscriptionType.Failover).subscribe();
    }
}
