/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class KafkaThrottledLatestProcessedCommit
implements KafkaCommitHandler {
    private static final long THROTTLE_TIME_IN_MILLIS = 5000L;
    private static final Map<String, Map<Integer, TopicPartition>> TOPIC_PARTITIONS_CACHE = new ConcurrentHashMap<String, Map<Integer, TopicPartition>>();
    private final Map<TopicPartition, OffsetStore> offsetStores = new HashMap<TopicPartition, OffsetStore>();
    private final KafkaConsumer<?, ?> consumer;
    private final KafkaSource<?, ?> source;
    private final int maxReceivedWithoutAckAllowed;
    private volatile Context context;
    private long nextCommitTime;

    private KafkaThrottledLatestProcessedCommit(KafkaConsumer<?, ?> consumer, KafkaSource<?, ?> source, int maxReceivedWithoutAckAllowed) {
        this.consumer = consumer;
        this.source = source;
        this.maxReceivedWithoutAckAllowed = maxReceivedWithoutAckAllowed;
    }

    private static int getNextPowerOfTwoEqualOrGreater(int v) {
        if (v <= 0) {
            return 1;
        }
        --v;
        v |= v >>> 1;
        v |= v >>> 2;
        v |= v >>> 4;
        v |= v >>> 8;
        v |= v >>> 16;
        return ++v;
    }

    public static void clearCache() {
        TOPIC_PARTITIONS_CACHE.clear();
    }

    public static KafkaThrottledLatestProcessedCommit create(KafkaConsumer<?, ?> consumer, Map<String, String> config, KafkaSource<?, ?> source) {
        int maxPollRecords = Integer.parseInt(config.getOrDefault("max.poll.records", "500"));
        int maxReceivedWithoutAckAllowed = KafkaThrottledLatestProcessedCommit.getNextPowerOfTwoEqualOrGreater(maxPollRecords * 2);
        KafkaLogging.log.settingMaxReceivedWithoutAckAllowed(config.get("group.id"), maxReceivedWithoutAckAllowed);
        return new KafkaThrottledLatestProcessedCommit(consumer, source, maxReceivedWithoutAckAllowed);
    }

    private <K, V> TopicPartition getTopicPartition(IncomingKafkaRecord<K, V> record) {
        return TOPIC_PARTITIONS_CACHE.computeIfAbsent(record.getTopic(), topic -> new ConcurrentHashMap()).computeIfAbsent(record.getPartition(), partition -> new TopicPartition(record.getTopic(), (int)partition));
    }

    private OffsetStore getOffsetStore(TopicPartition topicPartition) {
        return this.offsetStores.computeIfAbsent(topicPartition, t -> new OffsetStore((TopicPartition)t, this.maxReceivedWithoutAckAllowed));
    }

    @Override
    public void partitionsAssigned(Context context, Set<TopicPartition> partitions) {
        this.context = context;
        this.offsetStores.clear();
        this.resetNextCommitTime();
    }

    @Override
    public <K, V> IncomingKafkaRecord<K, V> received(IncomingKafkaRecord<K, V> record) {
        TopicPartition recordsTopicPartition = this.getTopicPartition(record);
        try {
            this.getOffsetStore(recordsTopicPartition).received(record.getOffset());
        }
        catch (TooManyMessagegsWithoutAckingException ex) {
            this.source.reportFailure(ex);
        }
        return record;
    }

    private void resetNextCommitTime() {
        this.nextCommitTime = System.currentTimeMillis() + 5000L;
    }

    private Map<TopicPartition, Long> clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping() {
        HashMap<TopicPartition, Long> offsetsMapping = new HashMap<TopicPartition, Long>();
        this.offsetStores.entrySet().forEach(offsetStoreEntry -> {
            TopicPartition topicPartition = (TopicPartition)offsetStoreEntry.getKey();
            ((OffsetStore)offsetStoreEntry.getValue()).clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset().ifPresent(offset -> offsetsMapping.put(topicPartition, offset));
        });
        return offsetsMapping;
    }

    @Override
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> record) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.context.runOnContext(v -> {
            this.offsetStores.get(this.getTopicPartition(record)).processed(record.getOffset());
            if (System.currentTimeMillis() > this.nextCommitTime) {
                this.resetNextCommitTime();
                Map<TopicPartition, Long> offsetsMapping = this.clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffsetMapping();
                if (!offsetsMapping.isEmpty()) {
                    Map<TopicPartition, OffsetAndMetadata> offsets = offsetsMapping.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata().setOffset((Long)e.getValue() + 1L)));
                    this.consumer.getDelegate().commit(offsets, a -> future.complete(null));
                    return;
                }
            }
            future.complete(null);
        });
        return future;
    }

    public static class TooManyMessagegsWithoutAckingException
    extends Exception {
        public TooManyMessagegsWithoutAckingException() {
            super("Too Many Messages without Acking");
        }
    }

    private static class OffsetStore {
        private final TopicPartition topicPartition;
        private final Queue<Long> receivedOffsets = new LinkedList<Long>();
        private final Set<Long> processedOffsets = new HashSet<Long>();
        private final int maxReceivedWithoutAckAllowed;
        private final int maxReceivedWithoutAckAllowedMinusOne;
        private long unProcessedTotal = 0L;

        OffsetStore(TopicPartition topicPartition, int maxReceivedWithoutAckAllowed) {
            this.topicPartition = topicPartition;
            this.maxReceivedWithoutAckAllowed = maxReceivedWithoutAckAllowed;
            this.maxReceivedWithoutAckAllowedMinusOne = maxReceivedWithoutAckAllowed - 1;
        }

        void received(long offset) throws TooManyMessagegsWithoutAckingException {
            this.receivedOffsets.offer(offset);
            ++this.unProcessedTotal;
            if (this.unProcessedTotal >= (long)this.maxReceivedWithoutAckAllowed && (this.unProcessedTotal & (long)this.maxReceivedWithoutAckAllowedMinusOne) == 0L) {
                KafkaLogging.log.receivedTooManyMessagesWithoutAcking(this.topicPartition.toString(), this.unProcessedTotal);
                throw new TooManyMessagegsWithoutAckingException();
            }
        }

        void processed(long offset) {
            if (!this.receivedOffsets.isEmpty() && this.receivedOffsets.peek() <= offset) {
                this.processedOffsets.add(offset);
            }
        }

        OptionalLong clearLesserSequentiallyProcessedOffsetsAndReturnLargestOffset() {
            if (!this.processedOffsets.isEmpty()) {
                long largestSequentialProcessedOffset = -1L;
                while (!this.receivedOffsets.isEmpty() && this.processedOffsets.remove(this.receivedOffsets.peek())) {
                    --this.unProcessedTotal;
                    largestSequentialProcessedOffset = this.receivedOffsets.poll();
                }
                if (largestSequentialProcessedOffset > -1L) {
                    return OptionalLong.of(largestSequentialProcessedOffset);
                }
            }
            return OptionalLong.empty();
        }
    }
}

