/*
 * Decompiled with CFR 0.152.
 */
package io.americanexpress.synapse.subscriber.kafka.errorhandler;

import io.americanexpress.synapse.subscriber.kafka.annotation.KafkaErrorHandler;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

@KafkaErrorHandler
public class BaseKafkaSubscriberErrorHandler
implements CommonErrorHandler {
    protected final XLogger log = XLoggerFactory.getXLogger(this.getClass());

    public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
        this.handleException(thrownException, Collections.singletonList(record), consumer);
    }

    public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        this.handleException(thrownException, records, consumer);
    }

    public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
        this.handleBatchException(thrownException, data, consumer);
    }

    public void handleBatchException(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
        if (Optional.ofNullable(data).isPresent()) {
            data.iterator().forEachRemaining(kafkaRecord -> this.handleException(thrownException, consumer, (ConsumerRecord<?, ?>)kafkaRecord));
        }
    }

    public void handleException(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer) {
        if (!CollectionUtils.isEmpty(records)) {
            records.forEach(kafkaRecord -> this.handleException(thrownException, consumer, (ConsumerRecord<?, ?>)kafkaRecord));
        }
    }

    private void handleException(Exception thrownException, Consumer<?, ?> consumer, ConsumerRecord<?, ?> kafkaRecord) {
        int partition = kafkaRecord.partition();
        long offset = kafkaRecord.offset();
        String topic = kafkaRecord.topic();
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        consumer.seek(topicPartition, offset++);
        consumer.commitAsync();
        this.log.info("MESSAGE_SKIPPED, TOPIC: {}, OFFSET: {}, CAUSE: {}, EXCEPTION: {}", new Object[]{topic, offset, thrownException.getCause(), thrownException.getLocalizedMessage()});
    }
}

