/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.kafkastreamsprocessor.impl.errors;

import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqMetadataHandler;
import io.quarkiverse.kafkastreamsprocessor.impl.errors.ErrorHandlingStrategy;
import io.quarkiverse.kafkastreamsprocessor.impl.errors.GlobalDLQProductionExceptionHandlerDelegate;
import io.quarkiverse.kafkastreamsprocessor.impl.errors.LogCallbackExceptionProducerDecorator;
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import io.quarkus.arc.Unremovable;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Unremovable
public class LogAndSendToDlqExceptionHandlerDelegate
implements DeserializationExceptionHandler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(LogAndSendToDlqExceptionHandlerDelegate.class);
    private final DlqMetadataHandler dlqMetadataHandler;
    private final KafkaStreamsProcessorMetrics metrics;
    private final KafkaClientSupplier clientSupplier;
    private final KStreamsProcessorConfig kStreamsProcessorConfig;
    boolean sendToDlq;
    Producer<byte[], byte[]> dlqProducer;

    @Inject
    public LogAndSendToDlqExceptionHandlerDelegate(KafkaClientSupplier kafkaClientSupplier, KafkaStreamsProcessorMetrics metrics, DlqMetadataHandler dlqMetadataHandler, KStreamsProcessorConfig kStreamsProcessorConfig) {
        this.clientSupplier = kafkaClientSupplier;
        this.metrics = metrics;
        this.dlqMetadataHandler = dlqMetadataHandler;
        this.kStreamsProcessorConfig = kStreamsProcessorConfig;
    }

    public DeserializationExceptionHandler.DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
        this.metrics.processorErrorCounter().increment();
        if (this.sendToDlq) {
            this.sendToDlq(context, record, exception);
        } else {
            log.error("Exception caught during Deserialization, message dropped; taskId: {}, topic: {}, partition: {}, offset: {}", new Object[]{context.taskId(), record.topic(), record.partition(), record.offset(), exception});
        }
        return DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE;
    }

    private void sendToDlq(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
        this.metrics.microserviceDlqSentCounter().increment();
        log.error("Exception caught during Deserialization, sending to the dead letter queue topic; taskId: {}, topic: {}, partition: {}, offset: {}", new Object[]{context.taskId(), record.topic(), record.partition(), record.offset(), exception});
        this.dlqProducer.send(new ProducerRecord((String)this.kStreamsProcessorConfig.dlq().topic().get(), null, Long.valueOf(record.timestamp()), (Object)((byte[])record.key()), (Object)((byte[])record.value()), (Iterable)this.dlqMetadataHandler.withMetadata(record.headers(), record.topic(), record.partition(), exception)));
    }

    public void configure(Map<String, ?> configs) {
        this.sendToDlq = ErrorHandlingStrategy.shouldSendToDlq(this.kStreamsProcessorConfig.errorStrategy(), this.kStreamsProcessorConfig.dlq().topic());
        if (this.sendToDlq) {
            HashMap dlqConfigMap = new HashMap(configs);
            dlqConfigMap.put("dlq.producer", true);
            this.dlqProducer = new LogCallbackExceptionProducerDecorator((Producer<byte[], byte[]>)this.clientSupplier.getProducer(dlqConfigMap));
        }
    }

    @PreDestroy
    public void close() {
        if (this.dlqProducer != null) {
            this.dlqProducer.close(GlobalDLQProductionExceptionHandlerDelegate.GRACEFUL_PERIOD);
        }
    }
}

