package org.apache.pulsar.io.kafka.connect;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.10.0-rc-202110252206.jar:org/apache/pulsar/io/kafka/connect/KafkaConnectSource.class */
public class KafkaConnectSource extends AbstractKafkaConnectSource<KeyValue<byte[], byte[]>> {
    private final Cache<Schema, KafkaSchemaWrappedSchema> readerCache = CacheBuilder.newBuilder().maximumSize(JvmPauseMonitor.WARN_THRESHOLD_DEFAULT).expireAfterAccess(30, TimeUnit.MINUTES).build();
    private boolean jsonWithEnvelope = false;
    private static final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaConnectSource.class);
    private static Map<String, String> PROPERTIES = Collections.emptyMap();
    private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
    private static long FLUSH_TIMEOUT_MS = 2000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.10.0-rc-202110252206.jar:org/apache/pulsar/io/kafka/connect/KafkaConnectSource$KafkaSourceRecord.class */
    public class KafkaSourceRecord extends AbstractKafkaConnectSource<KeyValue<byte[], byte[]>>.AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>> implements KVRecord<byte[], byte[]> {
        /* JADX WARN: Type inference failed for: r1v10, types: [org.apache.pulsar.common.schema.KeyValue, T] */
        KafkaSourceRecord(SourceRecord sourceRecord) {
            super(sourceRecord);
            AvroData avroData = new AvroData(1000);
            byte[] fromConnectData = KafkaConnectSource.this.keyConverter.fromConnectData(sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key());
            this.key = fromConnectData != null ? Optional.of(Base64.getEncoder().encodeToString(fromConnectData)) : Optional.empty();
            this.value = new KeyValue(fromConnectData, KafkaConnectSource.this.valueConverter.fromConnectData(sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value()));
            this.topicName = Optional.of(sourceRecord.topic());
            if (sourceRecord.keySchema() != null) {
                this.keySchema = (KafkaSchemaWrappedSchema) KafkaConnectSource.this.readerCache.getIfPresent(sourceRecord.keySchema());
            }
            if (sourceRecord.valueSchema() != null) {
                this.valueSchema = (KafkaSchemaWrappedSchema) KafkaConnectSource.this.readerCache.getIfPresent(sourceRecord.valueSchema());
            }
            if (sourceRecord.keySchema() != null && this.keySchema == null) {
                this.keySchema = new KafkaSchemaWrappedSchema(avroData.fromConnectSchema(sourceRecord.keySchema()), KafkaConnectSource.this.keyConverter);
                KafkaConnectSource.this.readerCache.put(sourceRecord.keySchema(), this.keySchema);
            }
            if (sourceRecord.valueSchema() != null && this.valueSchema == null) {
                this.valueSchema = new KafkaSchemaWrappedSchema(avroData.fromConnectSchema(sourceRecord.valueSchema()), KafkaConnectSource.this.valueConverter);
                KafkaConnectSource.this.readerCache.put(sourceRecord.valueSchema(), this.valueSchema);
            }
            this.eventTime = Optional.ofNullable(sourceRecord.timestamp());
            this.partitionId = Optional.of(sourceRecord.sourcePartition().entrySet().stream().map(entry -> {
                return ((String) entry.getKey()) + "=" + entry.getValue();
            }).collect(Collectors.joining(",")));
            this.partitionIndex = Optional.ofNullable(sourceRecord.kafkaPartition());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource.AbstractKafkaSourceRecord
        public boolean isEmpty() {
            return ((KeyValue) this.value).getValue() == null;
        }

        @Override // org.apache.pulsar.functions.api.KVRecord
        public org.apache.pulsar.client.api.Schema<byte[]> getKeySchema() {
            return (KafkaConnectSource.this.jsonWithEnvelope || this.keySchema == null) ? org.apache.pulsar.client.api.Schema.BYTES : this.keySchema;
        }

        @Override // org.apache.pulsar.functions.api.KVRecord
        public org.apache.pulsar.client.api.Schema<byte[]> getValueSchema() {
            return (KafkaConnectSource.this.jsonWithEnvelope || this.valueSchema == null) ? org.apache.pulsar.client.api.Schema.BYTES : this.valueSchema;
        }

        @Override // org.apache.pulsar.functions.api.KVRecord
        public KeyValueEncodingType getKeyValueEncodingType() {
            return KafkaConnectSource.this.jsonWithEnvelope ? KeyValueEncodingType.INLINE : KeyValueEncodingType.SEPARATED;
        }
    }

    @Override // org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        if (map.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
            this.jsonWithEnvelope = Boolean.parseBoolean(map.get(JSON_WITH_ENVELOPE_CONFIG).toString());
            map.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, Boolean.valueOf(this.jsonWithEnvelope));
        } else {
            map.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
        }
        log.info("jsonWithEnvelope: {}", Boolean.valueOf(this.jsonWithEnvelope));
        super.open(map, sourceContext);
    }

    @Override // org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource
    /* renamed from: processSourceRecord, reason: merged with bridge method [inline-methods] */
    public synchronized AbstractKafkaConnectSource<KeyValue<byte[], byte[]>>.AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>> processSourceRecord2(SourceRecord sourceRecord) {
        KafkaSourceRecord kafkaSourceRecord = new KafkaSourceRecord(sourceRecord);
        this.offsetWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
        return kafkaSourceRecord;
    }
}
