package org.apache.pulsar.io.kafka;

import io.jsonwebtoken.io.Encoders;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.header.Header;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaAbstractSource.class */
public abstract class KafkaAbstractSource<V> extends KafkaPushSource<V> {
    public static final String HEADER_KAFKA_TOPIC_KEY = "__kafka_topic";
    public static final String HEADER_KAFKA_PTN_KEY = "__kafka_partition";
    public static final String HEADER_KAFKA_OFFSET_KEY = "__kafka_offset";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaAbstractSource.class);
    private volatile Consumer<Object, Object> consumer;
    private volatile boolean running = false;
    private KafkaSourceConfig kafkaSourceConfig;
    private Thread runnerThread;

    /* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaAbstractSource$KafkaRecord.class */
    protected static class KafkaRecord<V> implements Record<V> {
        private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaRecord.class);
        private final ConsumerRecord<String, ?> record;
        private final V value;
        private final Schema<V> schema;
        private final Map<String, String> properties;
        private final CompletableFuture<Void> completableFuture = new CompletableFuture<>();

        public KafkaRecord(ConsumerRecord<String, ?> consumerRecord, V v, Schema<V> schema, Map<String, String> map) {
            this.record = consumerRecord;
            this.value = v;
            this.schema = schema;
            this.properties = map;
        }

        public Optional<String> getPartitionId() {
            return Optional.of(Integer.toString(this.record.partition()));
        }

        public Optional<Integer> getPartitionIndex() {
            return Optional.of(Integer.valueOf(this.record.partition()));
        }

        public Optional<Long> getRecordSequence() {
            return Optional.of(Long.valueOf(this.record.offset()));
        }

        public Optional<String> getKey() {
            return Optional.ofNullable(this.record.key());
        }

        public V getValue() {
            return this.value;
        }

        public void ack() {
            this.completableFuture.complete(null);
        }

        public Schema<V> getSchema() {
            return this.schema;
        }

        public Map<String, String> getProperties() {
            return this.properties;
        }

        public CompletableFuture<Void> getCompletableFuture() {
            return this.completableFuture;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaAbstractSource$KeyValueKafkaRecord.class */
    protected static class KeyValueKafkaRecord<V> extends KafkaRecord implements KVRecord<Object, Object> {
        private final Schema<Object> keySchema;
        private final Schema<Object> valueSchema;

        public KeyValueKafkaRecord(ConsumerRecord consumerRecord, KeyValue keyValue, Schema<Object> schema, Schema<Object> schema2, Map<String, String> map) {
            super(consumerRecord, keyValue, null, map);
            this.keySchema = schema;
            this.valueSchema = schema2;
        }

        public Schema<Object> getKeySchema() {
            return this.keySchema;
        }

        public Schema<Object> getValueSchema() {
            return this.valueSchema;
        }

        public KeyValueEncodingType getKeyValueEncodingType() {
            return KeyValueEncodingType.SEPARATED;
        }
    }

    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        this.kafkaSourceConfig = KafkaSourceConfig.load(map, sourceContext);
        Objects.requireNonNull(this.kafkaSourceConfig.getTopic(), "Kafka topic is not set");
        Objects.requireNonNull(this.kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set");
        Objects.requireNonNull(this.kafkaSourceConfig.getGroupId(), "Kafka consumer group id is not set");
        if (this.kafkaSourceConfig.getFetchMinBytes() <= 0) {
            throw new IllegalArgumentException("Invalid Kafka Consumer fetchMinBytes : " + this.kafkaSourceConfig.getFetchMinBytes());
        }
        if (this.kafkaSourceConfig.isAutoCommitEnabled() && this.kafkaSourceConfig.getAutoCommitIntervalMs() <= 0) {
            throw new IllegalArgumentException("Invalid Kafka Consumer autoCommitIntervalMs : " + this.kafkaSourceConfig.getAutoCommitIntervalMs());
        }
        if (this.kafkaSourceConfig.getSessionTimeoutMs() <= 0) {
            throw new IllegalArgumentException("Invalid Kafka Consumer sessionTimeoutMs : " + this.kafkaSourceConfig.getSessionTimeoutMs());
        }
        if (this.kafkaSourceConfig.getHeartbeatIntervalMs() <= 0) {
            throw new IllegalArgumentException("Invalid Kafka Consumer heartbeatIntervalMs : " + this.kafkaSourceConfig.getHeartbeatIntervalMs());
        }
        Properties properties = new Properties();
        if (this.kafkaSourceConfig.getConsumerConfigProperties() != null) {
            properties.putAll(this.kafkaSourceConfig.getConsumerConfigProperties());
        }
        properties.put("bootstrap.servers", this.kafkaSourceConfig.getBootstrapServers());
        if (StringUtils.isNotEmpty(this.kafkaSourceConfig.getSecurityProtocol())) {
            properties.put("security.protocol", this.kafkaSourceConfig.getSecurityProtocol());
        }
        if (StringUtils.isNotEmpty(this.kafkaSourceConfig.getSaslMechanism())) {
            properties.put(SaslConfigs.SASL_MECHANISM, this.kafkaSourceConfig.getSaslMechanism());
        }
        if (StringUtils.isNotEmpty(this.kafkaSourceConfig.getSaslJaasConfig())) {
            properties.put(SaslConfigs.SASL_JAAS_CONFIG, this.kafkaSourceConfig.getSaslJaasConfig());
        }
        if (StringUtils.isNotEmpty(this.kafkaSourceConfig.getSslEnabledProtocols())) {
            properties.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, this.kafkaSourceConfig.getSslEnabledProtocols());
        }
        if (StringUtils.isNotEmpty(this.kafkaSourceConfig.getSslEndpointIdentificationAlgorithm())) {
            properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, this.kafkaSourceConfig.getSslEndpointIdentificationAlgorithm());
        }
        if (StringUtils.isNotEmpty(this.kafkaSourceConfig.getSslTruststoreLocation())) {
            properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.kafkaSourceConfig.getSslTruststoreLocation());
        }
        if (StringUtils.isNotEmpty(this.kafkaSourceConfig.getSslTruststorePassword())) {
            properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.kafkaSourceConfig.getSslTruststorePassword());
        }
        properties.put("group.id", this.kafkaSourceConfig.getGroupId());
        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, String.valueOf(this.kafkaSourceConfig.getFetchMinBytes()));
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(this.kafkaSourceConfig.isAutoCommitEnabled()));
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(this.kafkaSourceConfig.getAutoCommitIntervalMs()));
        properties.put("session.timeout.ms", String.valueOf(this.kafkaSourceConfig.getSessionTimeoutMs()));
        properties.put("heartbeat.interval.ms", String.valueOf(this.kafkaSourceConfig.getHeartbeatIntervalMs()));
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.kafkaSourceConfig.getAutoOffsetReset());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.kafkaSourceConfig.getKeyDeserializationClass());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.kafkaSourceConfig.getValueDeserializationClass());
        try {
            this.consumer = new KafkaConsumer(beforeCreateConsumer(properties));
            start();
        } catch (Exception e) {
            throw new IllegalArgumentException("Unable to instantiate Kafka consumer", e);
        }
    }

    protected Properties beforeCreateConsumer(Properties properties) {
        return properties;
    }

    public void close() throws InterruptedException {
        LOG.info("Stopping kafka source");
        this.running = false;
        if (this.runnerThread != null) {
            this.runnerThread.interrupt();
            this.runnerThread.join();
            this.runnerThread = null;
        }
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
        LOG.info("Kafka source stopped.");
    }

    public void start() {
        LOG.info("Starting subscribe kafka source on {}", this.kafkaSourceConfig.getTopic());
        this.consumer.subscribe(Collections.singletonList(this.kafkaSourceConfig.getTopic()));
        this.runnerThread = new Thread(() -> {
            LOG.info("Kafka source started.");
            while (this.running) {
                try {
                    ConsumerRecords<Object, Object> poll = this.consumer.poll(Duration.ofSeconds(1L));
                    CompletableFuture[] completableFutureArr = new CompletableFuture[poll.count()];
                    int i = 0;
                    Iterator<ConsumerRecord<Object, Object>> it = poll.iterator();
                    while (it.hasNext()) {
                        KafkaRecord buildRecord = buildRecord(it.next());
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Write record {} {} {}", buildRecord.getKey(), buildRecord.getValue(), buildRecord.getSchema());
                        }
                        consume(buildRecord);
                        completableFutureArr[i] = buildRecord.getCompletableFuture();
                        i++;
                    }
                    if (!this.kafkaSourceConfig.isAutoCommitEnabled()) {
                        CompletableFuture.allOf(completableFutureArr).get();
                        this.consumer.commitSync();
                    }
                } catch (Exception e) {
                    LOG.error("Error while processing records", (Throwable) e);
                    notifyError(e);
                    return;
                }
            }
        });
        this.running = true;
        this.runnerThread.setName("Kafka Source Thread");
        this.runnerThread.start();
    }

    public abstract KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord);

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> copyKafkaHeaders(ConsumerRecord<Object, Object> consumerRecord) {
        if (!this.kafkaSourceConfig.isCopyHeadersEnabled()) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        hashMap.put(HEADER_KAFKA_TOPIC_KEY, consumerRecord.topic());
        hashMap.put(HEADER_KAFKA_PTN_KEY, Integer.toString(consumerRecord.partition()));
        hashMap.put(HEADER_KAFKA_OFFSET_KEY, Long.toString(consumerRecord.offset()));
        for (Header header : consumerRecord.headers()) {
            hashMap.put(header.key(), Encoders.BASE64.encode(header.value()));
        }
        return hashMap;
    }
}
