/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.kafkastreamsprocessor.impl.errors;

import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqMetadataHandler;
import io.quarkiverse.kafkastreamsprocessor.impl.errors.LogCallbackExceptionProducerDecorator;
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import io.quarkus.arc.Unremovable;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import lombok.Generated;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Unremovable
class GlobalDLQProductionExceptionHandlerDelegate
implements ProductionExceptionHandler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GlobalDLQProductionExceptionHandlerDelegate.class);
    protected static final Duration GRACEFUL_PERIOD = Duration.ofSeconds(29L);
    private final KafkaClientSupplier clientSupplier;
    private final DlqMetadataHandler dlqMetadataHandler;
    private final KafkaStreamsProcessorMetrics metrics;
    private final KStreamsProcessorConfig kStreamsProcessorConfig;
    private Producer<byte[], byte[]> kafkaProducer;

    @Inject
    public GlobalDLQProductionExceptionHandlerDelegate(KafkaClientSupplier kafkaClientSupplier, DlqMetadataHandler dlqMetadataHandler, KafkaStreamsProcessorMetrics metrics, KStreamsProcessorConfig kStreamsProcessorConfig) {
        this.clientSupplier = kafkaClientSupplier;
        this.dlqMetadataHandler = dlqMetadataHandler;
        this.metrics = metrics;
        this.kStreamsProcessorConfig = kStreamsProcessorConfig;
    }

    public ProductionExceptionHandler.ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
        if (this.kafkaProducer != null && this.kStreamsProcessorConfig.globalDlq().topic().isPresent()) {
            this.sendToGlobalDlq(record, exception);
        } else {
            log.warn("Exception caught during production but no GlobalDLQ is configured, incoming message will be consumed", (Throwable)exception);
        }
        return ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE;
    }

    private void sendToGlobalDlq(ProducerRecord<byte[], byte[]> record, Exception exception) {
        this.metrics.globalDlqSentCounter().increment();
        log.error("Exception caught during production, sending to the dead letter queue topic; topic: {}, partition: {}", new Object[]{record.topic(), record.partition(), exception});
        ProducerRecord dlqRecord = new ProducerRecord((String)this.kStreamsProcessorConfig.globalDlq().topic().get(), null, (Object)((byte[])record.key()), (Object)((byte[])record.value()), (Iterable)this.dlqMetadataHandler.withMetadata(record.headers(), record.topic(), record.partition(), exception));
        this.kafkaProducer.send(dlqRecord);
    }

    public void configure(Map<String, ?> config) {
        Map<String, ?> producerConfig = config;
        if (this.kStreamsProcessorConfig.globalDlq().topic().isPresent()) {
            HashMap dqlProducerConfig = new HashMap(producerConfig);
            dqlProducerConfig.put("max.request.size", this.kStreamsProcessorConfig.globalDlq().maxMessageSize());
            dqlProducerConfig.put("dlq.producer", true);
            this.kafkaProducer = new LogCallbackExceptionProducerDecorator((Producer<byte[], byte[]>)this.clientSupplier.getProducer(dqlProducerConfig));
        }
    }

    @PreDestroy
    public void close() {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close(GRACEFUL_PERIOD);
        }
    }
}

