/*
 * Decompiled with CFR 0.152.
 */
package dev.vality.kafka.common.exception.handler;

import dev.vality.kafka.common.util.LogUtil;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;

public class SeekToCurrentWithSleepBatchErrorHandler
extends SeekToCurrentBatchErrorHandler {
    private static final Logger log = LoggerFactory.getLogger(SeekToCurrentWithSleepBatchErrorHandler.class);
    private boolean ackAfterHandle = false;
    private final Integer sleepTimeSeconds;

    public SeekToCurrentWithSleepBatchErrorHandler(Integer sleepTimeSeconds) {
        this.sleepTimeSeconds = sleepTimeSeconds;
    }

    public SeekToCurrentWithSleepBatchErrorHandler() {
        this.sleepTimeSeconds = 5;
    }

    public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container) {
        log.error(String.format("Records commit failed, size=%d, %s", data.count(), LogUtil.toSummaryString(data)), (Throwable)thrownException);
        this.sleepBeforeRetry();
        super.handle(thrownException, data, consumer, container);
    }

    public boolean isAckAfterHandle() {
        return this.ackAfterHandle;
    }

    private void sleepBeforeRetry() {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(this.sleepTimeSeconds.intValue()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void setAckAfterHandle(boolean ackAfterHandle) {
        this.ackAfterHandle = ackAfterHandle;
    }
}

