package org.apache.pulsar.kafka.shade.io.confluent.kafka.formatter;

import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.SchemaProvider;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe;

/* loaded from: input_file:META-INF/bundled-dependencies/kafka-connect-avro-converter-shaded-2.11.0.0-rc5.jar:org/apache/pulsar/kafka/shade/io/confluent/kafka/formatter/SchemaMessageFormatter.class */
public abstract class SchemaMessageFormatter<T> implements MessageFormatter {
    private static final byte[] NULL_BYTES = "null".getBytes(StandardCharsets.UTF_8);
    private boolean printKey = false;
    private boolean printTimestamp = false;
    private boolean printIds = false;
    private boolean printKeyId = false;
    private boolean printValueId = false;
    private byte[] keySeparator = "\t".getBytes(StandardCharsets.UTF_8);
    private byte[] lineSeparator = "\n".getBytes(StandardCharsets.UTF_8);
    private byte[] idSeparator = "\t".getBytes(StandardCharsets.UTF_8);
    protected SchemaMessageDeserializer<T> deserializer;
    private static final int MAGIC_BYTE = 0;

    public SchemaMessageFormatter() {
    }

    public SchemaMessageFormatter(SchemaRegistryClient schemaRegistryClient, Deserializer deserializer) {
        this.deserializer = createDeserializer(schemaRegistryClient, deserializer);
    }

    protected abstract SchemaMessageDeserializer<T> createDeserializer(SchemaRegistryClient schemaRegistryClient, Deserializer deserializer);

    @Override // org.apache.kafka.common.MessageFormatter, org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        Properties properties = new Properties();
        properties.putAll(map);
        init(properties);
    }

    @Override // org.apache.kafka.common.MessageFormatter
    public void init(Properties properties) {
        if (properties == null) {
            throw new ConfigException("Missing schema registry url!");
        }
        String property = properties.getProperty("schema.registry.url");
        if (property == null) {
            throw new ConfigException("Missing schema registry url!");
        }
        if (properties.containsKey("print.timestamp")) {
            this.printTimestamp = properties.getProperty("print.timestamp").trim().toLowerCase().equals("true");
        }
        if (properties.containsKey("print.key")) {
            this.printKey = properties.getProperty("print.key").trim().toLowerCase().equals("true");
        }
        if (properties.containsKey("key.separator")) {
            this.keySeparator = properties.getProperty("key.separator").getBytes(StandardCharsets.UTF_8);
        }
        if (properties.containsKey("line.separator")) {
            this.lineSeparator = properties.getProperty("line.separator").getBytes(StandardCharsets.UTF_8);
        }
        Deserializer deserializer = null;
        if (properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
            try {
                deserializer = (Deserializer) Class.forName((String) properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).newInstance();
            } catch (Exception e) {
                throw new ConfigException("Error initializing Key deserializer: " + e.getMessage());
            }
        }
        if (properties.containsKey("print.schema.ids")) {
            this.printIds = properties.getProperty("print.schema.ids").trim().toLowerCase().equals("true");
            if (this.printIds) {
                this.printValueId = true;
                if (deserializer == null || (deserializer instanceof AbstractKafkaSchemaSerDe)) {
                    this.printKeyId = true;
                }
            }
        }
        if (properties.containsKey("schema.id.separator")) {
            this.idSeparator = properties.getProperty("schema.id.separator").getBytes(StandardCharsets.UTF_8);
        }
        if (this.deserializer == null) {
            this.deserializer = createDeserializer(createSchemaRegistry(property, getPropertiesMap(properties)), deserializer);
        }
    }

    private Map<String, Object> getPropertiesMap(Properties properties) {
        HashMap hashMap = new HashMap();
        for (String str : properties.stringPropertyNames()) {
            hashMap.put(str, properties.getProperty(str));
        }
        return hashMap;
    }

    @Override // org.apache.kafka.common.MessageFormatter
    public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream printStream) {
        if (this.printTimestamp) {
            try {
                TimestampType timestampType = consumerRecord.timestampType();
                if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
                    printStream.write(String.format("%s:%d", timestampType, Long.valueOf(consumerRecord.timestamp())).getBytes(StandardCharsets.UTF_8));
                } else {
                    printStream.write("NO_TIMESTAMP".getBytes(StandardCharsets.UTF_8));
                }
                printStream.write(this.keySeparator);
            } catch (IOException e) {
                throw new SerializationException("Error while formatting the timestamp", e);
            }
        }
        if (this.printKey) {
            try {
                if (this.deserializer.getKeyDeserializer() != null) {
                    Object deserializeKey = consumerRecord.key() == null ? null : this.deserializer.deserializeKey(null, consumerRecord.key());
                    printStream.write(deserializeKey != null ? deserializeKey.toString().getBytes(StandardCharsets.UTF_8) : NULL_BYTES);
                } else {
                    writeTo(consumerRecord.key(), printStream);
                }
                if (this.printKeyId) {
                    printStream.write(this.idSeparator);
                    printStream.print(schemaIdFor(consumerRecord.key()));
                }
                printStream.write(this.keySeparator);
            } catch (IOException e2) {
                throw new SerializationException("Error while formatting the key", e2);
            }
        }
        try {
            writeTo(consumerRecord.value(), printStream);
            if (this.printValueId) {
                printStream.write(this.idSeparator);
                printStream.print(schemaIdFor(consumerRecord.value()));
            }
            printStream.write(this.lineSeparator);
        } catch (IOException e3) {
            throw new SerializationException("Error while formatting the value", e3);
        }
    }

    protected abstract void writeTo(byte[] bArr, PrintStream printStream) throws IOException;

    @Override // org.apache.kafka.common.MessageFormatter, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    private int schemaIdFor(byte[] bArr) {
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        if (wrap.get() != 0) {
            throw new SerializationException("Unknown magic byte!");
        }
        return wrap.getInt();
    }

    private SchemaRegistryClient createSchemaRegistry(String str, Map<String, Object> map) {
        String validateAndMaybeGetMockScope = MockSchemaRegistry.validateAndMaybeGetMockScope(Collections.singletonList(str));
        List singletonList = Collections.singletonList(getProvider());
        return validateAndMaybeGetMockScope == null ? new CachedSchemaRegistryClient(str, 1000, (List<SchemaProvider>) singletonList, (Map<String, ?>) map) : MockSchemaRegistry.getClientForScope(validateAndMaybeGetMockScope, singletonList);
    }

    protected abstract SchemaProvider getProvider();
}
