package org.apache.pulsar.io.kafka;

import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "kafka", type = IOType.SINK, help = "The KafkaBytesSink is used for moving messages from Pulsar to Kafka.", configClass = KafkaSinkConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaBytesSink.class */
public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaBytesSink.class);

    @Override // org.apache.pulsar.io.kafka.KafkaAbstractSink
    protected Properties beforeCreateProducer(Properties properties) {
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        log.info("Created kafka producer config : {}", properties);
        return properties;
    }

    @Override // org.apache.pulsar.io.kafka.KafkaAbstractSink
    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
        return new KeyValue<>((String) record.getKey().orElse(null), (byte[]) record.getValue());
    }
}
