package io.streamthoughts.azkarra.commons.streams;

import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/commons/streams/LoggingStateRestoreListener.class */
public class LoggingStateRestoreListener implements StateRestoreListener, StateRestoreService {
    private final Map<TopicPartition, Long> totalOffsetToRestore = new ConcurrentHashMap();
    private final Map<TopicPartition, Long> startTimes = new ConcurrentHashMap();
    private final Map<String, StateRestoreInfo> stateToRestore = new ConcurrentHashMap();
    private static final Logger LOG = LoggerFactory.getLogger(LoggingStateRestoreListener.class);
    private static final Pattern PATTERN = Pattern.compile("(\\d[HMS])(?!$)");

    public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
        LOG.info("Starting restoration process for store '{}' on topicPartition '{}': startOffset={}, endingOffset={}", new Object[]{str, topicPartition, Long.valueOf(j), Long.valueOf(j2)});
        this.totalOffsetToRestore.put(topicPartition, Long.valueOf(j2 - j));
        this.stateToRestore.computeIfAbsent(str, StateRestoreInfo::new).addTopicPartitionRestoreInfo(new StatePartitionRestoreInfo(topicPartition, j, j2));
        this.startTimes.put(topicPartition, Long.valueOf(Time.SYSTEM.milliseconds()));
    }

    public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
        LOG.info("Batch restored for store '{}' on topicPartition '{}': batchEndOffset={}, numRecordRestored={}, totalRestored={}. Percentage remaining:  {}%", new Object[]{str, topicPartition, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(this.stateToRestore.get(str).getTopicPartitionRestoreInfo(topicPartition).incrementTotalRestored(j2)), calculateRemainingFormatted(topicPartition, j)});
    }

    public String calculateRemainingFormatted(TopicPartition topicPartition, long j) {
        return new DecimalFormat("#.##").format(((r0 - j) / this.totalOffsetToRestore.get(topicPartition).longValue()) * 100.0d);
    }

    public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
        Duration between = Duration.between(Instant.ofEpochMilli(this.startTimes.remove(topicPartition).longValue()), Instant.now());
        LOG.info("Restoration completed for store '{}' on topicPartition '{}', totalRestored={}. Duration: {}", new Object[]{str, topicPartition, Long.valueOf(j), humanReadableFormat(between)});
        this.stateToRestore.get(str).getTopicPartitionRestoreInfo(topicPartition).setDuration(between);
        this.totalOffsetToRestore.remove(topicPartition);
    }

    private static String humanReadableFormat(Duration duration) {
        return PATTERN.matcher(duration.toString().substring(2)).replaceAll("$1 ").toLowerCase();
    }

    @Override // io.streamthoughts.azkarra.commons.streams.StateRestoreService
    public StateRestoreInfo getStateRestoreInfo(String str) {
        return this.stateToRestore.get(Objects.requireNonNull(str, "state should not be null"));
    }
}
