package org.apache.pulsar.kafka.shade.io.confluent.kafka.formatter;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.common.KafkaException;
import kafka.common.MessageReader;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.pulsar.kafka.shade.avro.AvroRuntimeException;
import org.apache.pulsar.kafka.shade.avro.Schema;
import org.apache.pulsar.kafka.shade.avro.generic.GenericDatumReader;
import org.apache.pulsar.kafka.shade.avro.io.DecoderFactory;
import org.apache.pulsar.kafka.shade.avro.util.Utf8;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;

/* loaded from: input_file:META-INF/bundled-dependencies/kafka-connect-avro-converter-shaded-2.10.0-rc-202112312205.jar:org/apache/pulsar/kafka/shade/io/confluent/kafka/formatter/AvroMessageReader.class */
public class AvroMessageReader extends AbstractKafkaAvroSerializer implements MessageReader {
    private String topic;
    private BufferedReader reader;
    private Boolean parseKey;
    private String keySeparator;
    private boolean ignoreError;
    private final DecoderFactory decoderFactory;
    private Schema keySchema;
    private Schema valueSchema;
    private String keySubject;
    private String valueSubject;

    public AvroMessageReader() {
        this.topic = null;
        this.reader = null;
        this.parseKey = false;
        this.keySeparator = "\t";
        this.ignoreError = false;
        this.decoderFactory = DecoderFactory.get();
        this.keySchema = null;
        this.valueSchema = null;
        this.keySubject = null;
        this.valueSubject = null;
    }

    AvroMessageReader(SchemaRegistryClient schemaRegistryClient, Schema schema, Schema schema2, String str, boolean z, BufferedReader bufferedReader, boolean z2) {
        this.topic = null;
        this.reader = null;
        this.parseKey = false;
        this.keySeparator = "\t";
        this.ignoreError = false;
        this.decoderFactory = DecoderFactory.get();
        this.keySchema = null;
        this.valueSchema = null;
        this.keySubject = null;
        this.valueSubject = null;
        this.schemaRegistry = schemaRegistryClient;
        this.keySchema = schema;
        this.valueSchema = schema2;
        this.topic = str;
        this.keySubject = str + "-key";
        this.valueSubject = str + "-value";
        this.parseKey = Boolean.valueOf(z);
        this.reader = bufferedReader;
        this.autoRegisterSchema = z2;
    }

    @Override // kafka.common.MessageReader
    public void init(InputStream inputStream, Properties properties) {
        this.topic = properties.getProperty("topic");
        if (properties.containsKey("parse.key")) {
            this.parseKey = Boolean.valueOf(properties.getProperty("parse.key").trim().toLowerCase().equals("true"));
        }
        if (properties.containsKey("key.separator")) {
            this.keySeparator = properties.getProperty("key.separator");
        }
        if (properties.containsKey("ignore.error")) {
            this.ignoreError = properties.getProperty("ignore.error").trim().toLowerCase().equals("true");
        }
        this.reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
        String property = properties.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
        if (property == null) {
            throw new ConfigException("Missing schema registry url!");
        }
        this.schemaRegistry = new CachedSchemaRegistryClient(property, 1000, (Map<String, ?>) getPropertiesMap(properties));
        if (!properties.containsKey("value.schema")) {
            throw new ConfigException("Must provide the Avro schema string in value.schema");
        }
        String property2 = properties.getProperty("value.schema");
        Schema.Parser parser = new Schema.Parser();
        this.valueSchema = parser.parse(property2);
        if (this.parseKey.booleanValue()) {
            if (!properties.containsKey("key.schema")) {
                throw new ConfigException("Must provide the Avro schema string in key.schema");
            }
            this.keySchema = parser.parse(properties.getProperty("key.schema"));
        }
        this.keySubject = this.topic + "-key";
        this.valueSubject = this.topic + "-value";
        if (properties.containsKey("auto.register")) {
            this.autoRegisterSchema = Boolean.valueOf(properties.getProperty("auto.register").trim()).booleanValue();
        } else {
            this.autoRegisterSchema = true;
        }
    }

    private Map<String, Object> getPropertiesMap(Properties properties) {
        HashMap hashMap = new HashMap();
        for (String str : properties.stringPropertyNames()) {
            hashMap.put(str, properties.getProperty(str));
        }
        return hashMap;
    }

    @Override // kafka.common.MessageReader
    public ProducerRecord<byte[], byte[]> readMessage() {
        try {
            String readLine = this.reader.readLine();
            if (readLine == null) {
                return null;
            }
            if (!this.parseKey.booleanValue()) {
                return new ProducerRecord<>(this.topic, serializeImpl(this.valueSubject, jsonToAvro(readLine, this.valueSchema)));
            }
            int indexOf = readLine.indexOf(this.keySeparator);
            if (indexOf < 0) {
                if (this.ignoreError) {
                    return new ProducerRecord<>(this.topic, serializeImpl(this.valueSubject, jsonToAvro(readLine, this.valueSchema)));
                }
                throw new KafkaException("No key found in line " + readLine);
            }
            return new ProducerRecord<>(this.topic, serializeImpl(this.keySubject, jsonToAvro(readLine.substring(0, indexOf), this.keySchema)), serializeImpl(this.valueSubject, jsonToAvro(indexOf + this.keySeparator.length() > readLine.length() ? "" : readLine.substring(indexOf + this.keySeparator.length()), this.valueSchema)));
        } catch (IOException e) {
            throw new KafkaException("Error reading from input", e);
        }
    }

    private Object jsonToAvro(String str, Schema schema) {
        try {
            Object read = new GenericDatumReader(schema).read(null, this.decoderFactory.jsonDecoder(schema, str));
            if (schema.getType().equals(Schema.Type.STRING)) {
                read = ((Utf8) read).toString();
            }
            return read;
        } catch (IOException e) {
            throw new SerializationException(String.format("Error deserializing json %s to Avro of schema %s", str, schema), e);
        } catch (AvroRuntimeException e2) {
            throw new SerializationException(String.format("Error deserializing json %s to Avro of schema %s", str, schema), e2);
        }
    }

    @Override // kafka.common.MessageReader
    public void close() {
    }
}
