/*
 * Decompiled with CFR 0.152.
 */
package dev.vality.kafka.common.loader;

import dev.vality.kafka.common.exception.RetryableException;
import dev.vality.kafka.common.exception.SkippedException;
import dev.vality.kafka.common.loader.PreloadListener;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.ExponentialBackOff;

public class PreloadListenerImpl<K, T>
implements PreloadListener<K, T> {
    private static final Logger log = LoggerFactory.getLogger(PreloadListenerImpl.class);
    private ThreadLocal<Integer> countAttempt = ThreadLocal.withInitial(() -> 0);
    private int maxAttempt = 3;
    private ExponentialBackOff backOff = new ExponentialBackOff();

    public PreloadListenerImpl(int maxAttempt, ExponentialBackOff backOff) {
        this.maxAttempt = maxAttempt;
        this.backOff = backOff;
    }

    @Override
    public void preloadToLastOffsetInPartition(Consumer<K, T> consumer, String topic, int partition, java.util.function.Consumer<T> handleConsumer) {
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        List<TopicPartition> topics = Collections.singletonList(topicPartition);
        consumer.assign(topics);
        consumer.seekToEnd(topics);
        long current = consumer.position(topicPartition);
        if (current <= 0L) {
            return;
        }
        long endOffset = current - 1L;
        consumer.seekToBeginning(Collections.singletonList(topicPartition));
        current = consumer.position(topicPartition);
        BackOffExecution execution = this.backOff.start();
        while (current <= endOffset) {
            ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));
            try {
                if (!records.isEmpty()) {
                    records.forEach(record -> this.safeHandle(handleConsumer, (ConsumerRecord<K, T>)record));
                }
                current = consumer.position(topicPartition);
                execution = this.resetBackoff();
            }
            catch (RetryableException e) {
                log.error("PreloadListenerImpl preloadToLastOffsetInPartition RetryableException e: ", (Throwable)e);
                if (this.countAttempt.get() >= this.maxAttempt) {
                    throw new RuntimeException("PreloadListenerImpl cant retry!", e);
                }
                consumer.seek(topicPartition, current);
                this.countAttempt.set(this.countAttempt.get() + 1);
                this.waitBackoff(execution);
            }
            catch (Exception e) {
                log.error("PreloadListenerImpl preloadToLastOffsetInPartition critical exception e: ", (Throwable)e);
                throw e;
            }
        }
    }

    private void waitBackoff(BackOffExecution execution) {
        try {
            Thread.sleep(execution.nextBackOff());
        }
        catch (InterruptedException ex) {
            log.error("PreloadListenerImpl InterruptedException when wait retry e: ", (Throwable)ex);
            Thread.currentThread().interrupt();
        }
    }

    private BackOffExecution resetBackoff() {
        this.countAttempt.set(0);
        return this.backOff.start();
    }

    private void safeHandle(java.util.function.Consumer<T> handleConsumer, ConsumerRecord<K, T> record) {
        try {
            handleConsumer.accept(record.value());
        }
        catch (RetryableException e) {
            log.error("PreloadListenerImpl preloadToLastOffsetInPartition RetryableException e: ", (Throwable)e);
            throw e;
        }
        catch (SkippedException e) {
            log.error("PreloadListenerImpl preloadToLastOffsetInPartition SkippedException e: ", (Throwable)e);
        }
    }

    public PreloadListenerImpl() {
    }
}

