package io.streamthoughts.azkarra.commons.error;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/commons/error/DeadLetterTopicExceptionHandler.class */
public class DeadLetterTopicExceptionHandler implements DeserializationExceptionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DeadLetterTopicExceptionHandler.class);
    private static final String OUTPUT_TOPIC_DEFAULT_SUFFIX = "-rejected";
    private Serializer<byte[]> serializer = new ByteArraySerializer();
    private StringSerializer stringSerializer;
    private DeadLetterTopicExceptionHandlerConfig config;
    private List<Header> customHeaders;
    private String applicationId;
    private Producer<byte[], byte[]> internalProducer;

    public void configure(Map<String, ?> map) {
        this.config = new DeadLetterTopicExceptionHandlerConfig(map);
        this.applicationId = (String) map.get("application.id");
        this.customHeaders = (List) this.config.customHeaders().entrySet().stream().map(entry -> {
            return new RecordHeader((String) entry.getKey(), toByteArray(entry.getValue().toString()));
        }).collect(Collectors.toList());
        this.stringSerializer = new StringSerializer();
        this.stringSerializer.configure(map, false);
        Map<String, Object> producerConfigs = this.config.producerConfigs();
        if (producerConfigs.isEmpty()) {
            return;
        }
        LOG.info("Initializing internal KafkaProducer for exception handler: {}", getClass().getSimpleName());
        this.internalProducer = new KafkaProducer(producerConfigs, this.serializer, this.serializer);
    }

    public DeserializationExceptionHandler.DeserializationHandlerResponse handle(ProcessorContext processorContext, ConsumerRecord<byte[], byte[]> consumerRecord, Exception exc) {
        RecordCollector recordCollector = ((ProcessorContextImpl) processorContext).recordCollector();
        Headers headers = consumerRecord.headers();
        headers.add(ExceptionHeader.ERROR_EXCEPTION_STACKTRACE, toByteArray(getStacktrace(exc)));
        headers.add(ExceptionHeader.ERROR_EXCEPTION_MESSAGE, toByteArray(exc.getMessage()));
        headers.add(ExceptionHeader.ERROR_EXCEPTION_CLASS_NAME, toByteArray(exc.getClass().getName()));
        headers.add(ExceptionHeader.ERROR_TIMESTAMP, toByteArray(Long.valueOf(Time.SYSTEM.milliseconds())));
        headers.add(ExceptionHeader.ERROR_APPLICATION_ID, toByteArray(this.applicationId));
        headers.add(ExceptionHeader.ERROR_RECORD_TOPIC, toByteArray(consumerRecord.topic()));
        headers.add(ExceptionHeader.ERROR_RECORD_PARTITION, toByteArray(Integer.valueOf(consumerRecord.partition())));
        headers.add(ExceptionHeader.ERROR_RECORD_OFFSET, toByteArray(Long.valueOf(consumerRecord.offset())));
        List<Header> list = this.customHeaders;
        Objects.requireNonNull(headers);
        list.forEach(headers::add);
        final String outputTopic = this.config.outputTopic() != null ? this.config.outputTopic() : consumerRecord.topic() + "-rejected";
        LOG.debug("Sending rejected record from topic={}, partition={}, offset={} into topic {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), outputTopic});
        if (this.internalProducer != null) {
            this.internalProducer.send(new ProducerRecord(outputTopic, (Integer) null, Long.valueOf(consumerRecord.timestamp()), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), headers), new Callback() { // from class: io.streamthoughts.azkarra.commons.error.DeadLetterTopicExceptionHandler.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc2) {
                    DeadLetterTopicExceptionHandler.LOG.error("Fail to send corrupted record into topic {}. Ignored record.", outputTopic, exc2);
                }
            });
        } else {
            recordCollector.send(outputTopic, (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), headers, Long.valueOf(consumerRecord.timestamp()), this.serializer, this.serializer, (StreamPartitioner) null);
        }
        return getDeserializationHandlerResponse(exc);
    }

    private DeserializationExceptionHandler.DeserializationHandlerResponse getDeserializationHandlerResponse(Exception exc) {
        List<Class<?>> fatalExceptions = this.config.getFatalExceptions();
        DeserializationExceptionHandler.DeserializationHandlerResponse deserializationHandlerResponse = DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE;
        Iterator<Class<?>> it = fatalExceptions.iterator();
        while (it.hasNext() && deserializationHandlerResponse.equals(DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE)) {
            if (it.next().isAssignableFrom(exc.getClass())) {
                deserializationHandlerResponse = DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL;
            }
        }
        return deserializationHandlerResponse;
    }

    private byte[] toByteArray(Integer num) {
        return this.stringSerializer.serialize((String) null, Integer.toString(num.intValue()));
    }

    private byte[] toByteArray(Long l) {
        return this.stringSerializer.serialize((String) null, Long.toString(l.longValue()));
    }

    private byte[] toByteArray(String str) {
        return this.stringSerializer.serialize((String) null, str);
    }

    private String getStacktrace(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter((Writer) stringWriter, true));
        return stringWriter.getBuffer().toString();
    }
}
