/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.netty.util.internal.ConcurrentSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicPartitionStateManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicPartitionStateManager.class);
    private final int checkpointReadyCheckDelayMs;
    private final Counter waitingForAckCount;
    public static final long DEFAULT_LAST_READ_OFFSET = 0L;
    private final ConcurrentMap<TopicPartition, State> partitionState = new ConcurrentHashMap<TopicPartition, State>();

    public TopicPartitionStateManager(Registry registry, String kafkaClientId, int checkpointReadyCheckDelayMs) {
        this.checkpointReadyCheckDelayMs = checkpointReadyCheckDelayMs;
        this.waitingForAckCount = registry.counter("waitingOnAck", new String[]{"client-id", kafkaClientId});
    }

    public void recordMessageRead(TopicPartition tp, long offset) {
        if (!this.partitionState.containsKey(tp)) {
            this.partitionState.putIfAbsent(tp, new State());
        }
        ((State)this.partitionState.get(tp)).unAckedOffsets.add((Object)offset);
        ((State)this.partitionState.get(tp)).lastReadOffset.set(offset);
    }

    public void recordMessageAck(TopicPartition tp, long offset) {
        if (!this.partitionState.containsKey(tp)) {
            return;
        }
        ((State)this.partitionState.get(tp)).unAckedOffsets.remove((Object)offset);
    }

    public Optional<Long> getLastOffset(TopicPartition tp) {
        if (!this.partitionState.containsKey(tp)) {
            return Optional.empty();
        }
        return Optional.of(((State)this.partitionState.get(tp)).lastReadOffset.get());
    }

    private boolean allMessagesAcked(TopicPartition tp) {
        if (!this.partitionState.containsKey(tp)) {
            return true;
        }
        return ((State)this.partitionState.get(tp)).unAckedOffsets.size() == 0;
    }

    public Map<TopicPartition, OffsetAndMetadata> createCheckpoint(Collection<TopicPartition> partitions) {
        if (this.partitionState.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap<TopicPartition, OffsetAndMetadata> checkpoint = new HashMap<TopicPartition, OffsetAndMetadata>(partitions.size());
        for (TopicPartition tp : partitions) {
            while (!this.allMessagesAcked(tp)) {
                try {
                    this.waitingForAckCount.increment();
                    Thread.sleep(this.checkpointReadyCheckDelayMs);
                }
                catch (InterruptedException e) {
                    LOGGER.info("thread interrupted when creating checkpoint for {}", (Object)tp);
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("thread interrupted when creating checkpoint", e);
                }
            }
            State pState = (State)this.partitionState.get(tp);
            Optional<Object> lastOffset = Optional.ofNullable(pState != null ? Long.valueOf(pState.lastReadOffset.get()) : null);
            if (!lastOffset.isPresent() || (Long)lastOffset.get() == 0L) continue;
            checkpoint.put(tp, new OffsetAndMetadata((Long)lastOffset.get() + 1L, String.valueOf(System.currentTimeMillis())));
        }
        return checkpoint;
    }

    public void resetCounters(TopicPartition tp) {
        if (!this.partitionState.containsKey(tp)) {
            return;
        }
        ((State)this.partitionState.get(tp)).unAckedOffsets.clear();
        ((State)this.partitionState.get(tp)).lastReadOffset.set(0L);
    }

    public void resetCounters() {
        LOGGER.info("resetting all counters");
        if (this.partitionState.isEmpty()) {
            return;
        }
        this.partitionState.values().stream().forEach(state -> {
            ((State)state).unAckedOffsets.clear();
            ((State)state).lastReadOffset.set(0L);
        });
    }

    private class State {
        private final AtomicLong lastReadOffset = new AtomicLong(0L);
        private final ConcurrentSet<Long> unAckedOffsets = new ConcurrentSet();

        private State() {
        }
    }
}

