package org.springframework.kafka.retrytopic;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.core.NestedRuntimeException;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.kafka.listener.TimestampedException;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-2.8.7.jar:org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.class */
public class DeadLetterPublishingRecovererFactory {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog((Class<?>) DeadLetterPublishingRecovererFactory.class));
    private final DestinationTopicResolver destinationTopicResolver;
    private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction;
    private final Set<Class<? extends Exception>> fatalExceptions = new LinkedHashSet();
    private final Set<Class<? extends Exception>> nonFatalExceptions = new HashSet();
    private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = deadLetterPublishingRecoverer -> {
    };
    private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-kafka-2.8.7.jar:org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory$ListenerExceptionLoggingStrategy.class */
    public enum ListenerExceptionLoggingStrategy {
        NEVER,
        EACH_ATTEMPT,
        AFTER_RETRIES_EXHAUSTED
    }

    public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
        this.destinationTopicResolver = destinationTopicResolver;
    }

    public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> biFunction) {
        this.headersFunction = biFunction;
    }

    public final void addNotRetryableException(Class<? extends Exception> cls) {
        Assert.notNull(cls, "'exceptionType' cannot be null");
        this.fatalExceptions.add(cls);
    }

    public boolean removeNotRetryableException(Class<? extends Exception> cls) {
        return this.nonFatalExceptions.add(cls);
    }

    public void neverLogListenerException() {
        this.loggingStrategy = ListenerExceptionLoggingStrategy.NEVER;
    }

    public void alwaysLogListenerException() {
        this.loggingStrategy = ListenerExceptionLoggingStrategy.EACH_ATTEMPT;
    }

    public DeadLetterPublishingRecoverer create() {
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(this::resolveTemplate, false, this::resolveDestination) { // from class: org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory.1
            @Override // org.springframework.kafka.listener.DeadLetterPublishingRecoverer
            protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
                return DeadLetterPublishingRecoverer.HeaderNames.Builder.original().offsetHeader(KafkaHeaders.ORIGINAL_OFFSET).timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP).timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE).topicHeader(KafkaHeaders.ORIGINAL_TOPIC).partitionHeader(KafkaHeaders.ORIGINAL_PARTITION).consumerGroupHeader("kafka_dlt-original-consumer-group").exception().keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN).exceptionFqcn("kafka_exception-fqcn").exceptionCauseFqcn("kafka_exception-fqcn").keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE).exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE).keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE).exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE).build();
            }
        };
        deadLetterPublishingRecoverer.setHeadersFunction((consumerRecord, exc) -> {
            return addHeaders(consumerRecord, exc, getAttempts(consumerRecord));
        });
        if (this.headersFunction != null) {
            deadLetterPublishingRecoverer.addHeadersFunction(this.headersFunction);
        }
        deadLetterPublishingRecoverer.setFailIfSendResultIsError(true);
        deadLetterPublishingRecoverer.setAppendOriginalHeaders(false);
        deadLetterPublishingRecoverer.setThrowIfNoDestinationReturned(false);
        deadLetterPublishingRecoverer.setSkipSameTopicFatalExceptions(false);
        this.recovererCustomizer.accept(deadLetterPublishingRecoverer);
        Set<Class<? extends Exception>> set = this.fatalExceptions;
        Objects.requireNonNull(deadLetterPublishingRecoverer);
        set.forEach(cls -> {
            deadLetterPublishingRecoverer.addNotRetryableExceptions(cls);
        });
        Set<Class<? extends Exception>> set2 = this.nonFatalExceptions;
        Objects.requireNonNull(deadLetterPublishingRecoverer);
        set2.forEach(deadLetterPublishingRecoverer::removeClassification);
        return deadLetterPublishingRecoverer;
    }

    private KafkaOperations<?, ?> resolveTemplate(ProducerRecord<?, ?> producerRecord) {
        return this.destinationTopicResolver.getDestinationTopicByName(producerRecord.topic()).getKafkaOperations();
    }

    public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> consumer) {
        this.recovererCustomizer = consumer;
    }

    private TopicPartition resolveDestination(ConsumerRecord<?, ?> consumerRecord, Exception exc) {
        if (SeekUtils.isBackoffException(exc)) {
            throw ((NestedRuntimeException) exc);
        }
        DestinationTopic resolveDestinationTopic = this.destinationTopicResolver.resolveDestinationTopic(consumerRecord.topic(), Integer.valueOf(getAttempts(consumerRecord)), exc, getOriginalTimestampHeaderLong(consumerRecord));
        LOGGER.debug(() -> {
            return "Resolved topic: " + (resolveDestinationTopic.isNoOpsTopic() ? "none" : resolveDestinationTopic.getDestinationName());
        });
        maybeLogListenerException(exc, consumerRecord, resolveDestinationTopic);
        if (resolveDestinationTopic.isNoOpsTopic()) {
            return null;
        }
        return resolveTopicPartition(consumerRecord, resolveDestinationTopic);
    }

    private void maybeLogListenerException(Exception exc, ConsumerRecord<?, ?> consumerRecord, DestinationTopic destinationTopic) {
        if (destinationTopic.isDltTopic() && !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
            LOGGER.error(exc, () -> {
                return getErrorMessage(consumerRecord) + " and won't be retried. Sending to DLT with name " + destinationTopic.getDestinationName() + ".";
            });
            return;
        }
        if (destinationTopic.isNoOpsTopic() && !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
            LOGGER.error(exc, () -> {
                return getErrorMessage(consumerRecord) + " and won't be retried. No further action will be taken with this record.";
            });
        } else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrategy)) {
            LOGGER.error(exc, () -> {
                return getErrorMessage(consumerRecord) + ". Sending to retry topic " + destinationTopic.getDestinationName() + ".";
            });
        } else {
            LOGGER.debug(exc, () -> {
                return getErrorMessage(consumerRecord) + ". Sending to retry topic " + destinationTopic.getDestinationName() + ".";
            });
        }
    }

    private static String getErrorMessage(ConsumerRecord<?, ?> consumerRecord) {
        return "Record: " + getRecordInfo(consumerRecord) + " threw an error at topic " + consumerRecord.topic();
    }

    private static String getRecordInfo(ConsumerRecord<?, ?> consumerRecord) {
        Header lastHeader = consumerRecord.headers().lastHeader(KafkaHeaders.ORIGINAL_TOPIC);
        Object[] objArr = new Object[4];
        objArr[0] = consumerRecord.topic();
        objArr[1] = Integer.valueOf(consumerRecord.partition());
        objArr[2] = Long.valueOf(consumerRecord.offset());
        objArr[3] = lastHeader != null ? new String(lastHeader.value()) : consumerRecord.topic();
        return String.format("topic = %s, partition = %s, offset = %s, main topic = %s", objArr);
    }

    protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> consumerRecord, DestinationTopic destinationTopic) {
        return new TopicPartition(destinationTopic.getDestinationName(), consumerRecord.partition());
    }

    private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
        Header lastHeader = consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
        if (lastHeader == null) {
            return 1;
        }
        byte[] value = lastHeader.value();
        if (value.length == 1) {
            return value[0];
        }
        if (value.length == 4) {
            return ByteBuffer.wrap(value).getInt();
        }
        LOGGER.debug(() -> {
            return "Unexected size for retry_topic-attempts header: " + value.length;
        });
        return 1;
    }

    private Headers addHeaders(ConsumerRecord<?, ?> consumerRecord, Exception exc, int i) {
        RecordHeaders recordHeaders = new RecordHeaders();
        byte[] originalTimestampHeaderBytes = getOriginalTimestampHeaderBytes(consumerRecord);
        recordHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampHeaderBytes);
        recordHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, ByteBuffer.wrap(new byte[4]).putInt(i + 1).array());
        recordHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP, BigInteger.valueOf(getNextExecutionTimestamp(consumerRecord, exc, originalTimestampHeaderBytes)).toByteArray());
        return recordHeaders;
    }

    private long getNextExecutionTimestamp(ConsumerRecord<?, ?> consumerRecord, Exception exc, byte[] bArr) {
        long longValue = new BigInteger(bArr).longValue();
        long failureTimestamp = getFailureTimestamp(exc);
        long longValue2 = failureTimestamp + this.destinationTopicResolver.resolveDestinationTopic(consumerRecord.topic(), Integer.valueOf(getAttempts(consumerRecord)), exc, longValue).getDestinationDelay().longValue();
        LOGGER.debug(() -> {
            return String.format("FailureTimestamp: %s, Original timestamp: %s, nextExecutionTimestamp: %s", Long.valueOf(failureTimestamp), Long.valueOf(longValue), Long.valueOf(longValue2));
        });
        return longValue2;
    }

    private long getFailureTimestamp(Exception exc) {
        return ((exc instanceof NestedRuntimeException) && ((NestedRuntimeException) exc).contains(TimestampedException.class)) ? getTimestampedException(exc).getTimestamp() : Instant.now().toEpochMilli();
    }

    private TimestampedException getTimestampedException(Throwable th) {
        if (th == null) {
            throw new IllegalArgumentException("Provided exception does not contain a " + TimestampedException.class.getSimpleName() + " cause.");
        }
        return th.getClass().isAssignableFrom(TimestampedException.class) ? (TimestampedException) th : getTimestampedException(th.getCause());
    }

    private byte[] getOriginalTimestampHeaderBytes(ConsumerRecord<?, ?> consumerRecord) {
        Header originaTimeStampHeader = getOriginaTimeStampHeader(consumerRecord);
        return originaTimeStampHeader != null ? originaTimeStampHeader.value() : BigInteger.valueOf(consumerRecord.timestamp()).toByteArray();
    }

    private long getOriginalTimestampHeaderLong(ConsumerRecord<?, ?> consumerRecord) {
        Header originaTimeStampHeader = getOriginaTimeStampHeader(consumerRecord);
        return originaTimeStampHeader != null ? new BigInteger(originaTimeStampHeader.value()).longValue() : consumerRecord.timestamp();
    }

    private Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) {
        return consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
    }
}
