package org.reaktivity.nukleus.kafka.internal.cache;

import java.util.Iterator;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2LongHashMap;
import org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache;
import org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex;
import org.reaktivity.nukleus.kafka.internal.stream.HeadersFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.MessageFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/cache/CompactedTopicCache.class */
public class CompactedTopicCache implements TopicCache {
    private final MessageCache messageCache;
    private final PartitionIndex[] indexes;
    private final MessageIterator messageIterator;
    private final MessageFW messageRO = new MessageFW();
    private final HeadersFW headersRO = new HeadersFW();
    private final KeyedMessageIterator keyedMessageIterator = new KeyedMessageIterator();
    private final MessageImpl message = new MessageImpl();

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/cache/CompactedTopicCache$KeyedMessageIterator.class */
    final class KeyedMessageIterator implements Iterator<ImmutableTopicCache.MessageRef> {
        private final MessageImpl message;
        private final MessageImpl lastMessage;
        private int remainingEntries;

        KeyedMessageIterator() {
            this.message = new MessageImpl();
            this.lastMessage = new MessageImpl();
        }

        Iterator<ImmutableTopicCache.MessageRef> reset(Long2LongHashMap long2LongHashMap, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW) {
            int intValue = long2LongHashMap.keySet().iterator().next().intValue();
            long j = long2LongHashMap.get(intValue);
            this.remainingEntries = 1;
            PartitionIndex.Entry entry = CompactedTopicCache.this.indexes[intValue].getEntry(octetsFW);
            if (entry == null || entry.offset() < j) {
                this.lastMessage.wrap(intValue, Math.max(j, CompactedTopicCache.this.indexes[intValue].nextOffset()), -1);
            } else {
                this.message.wrap(intValue, entry);
                MessageFW message = this.message.message();
                if (message == null || !CompactedTopicCache.this.headersRO.wrap(message.headers()).matches(listFW)) {
                    this.lastMessage.wrap(this.message);
                } else {
                    this.remainingEntries = 2;
                    this.lastMessage.wrap(intValue, Math.max(this.message.offset() + 1, CompactedTopicCache.this.indexes[intValue].nextOffset()), -1);
                }
            }
            return this;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.remainingEntries > 0;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ImmutableTopicCache.MessageRef next() {
            MessageImpl messageImpl;
            switch (this.remainingEntries) {
                case 1:
                    messageImpl = this.lastMessage;
                    break;
                case 2:
                    messageImpl = this.message;
                    break;
                default:
                    messageImpl = null;
                    break;
            }
            this.remainingEntries--;
            return messageImpl;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/cache/CompactedTopicCache$MessageImpl.class */
    public class MessageImpl implements ImmutableTopicCache.MessageRef {
        private int partition;
        private long offset;
        private int messageHandle;

        private MessageImpl() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public MessageImpl wrap(int i, long j, int i2) {
            this.partition = i;
            this.offset = j;
            this.messageHandle = i2;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public MessageImpl wrap(int i, PartitionIndex.Entry entry) {
            wrap(i, entry.offset(), entry.messageHandle());
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void wrap(MessageImpl messageImpl) {
            wrap(messageImpl.partition, messageImpl.offset, messageImpl.messageHandle);
        }

        @Override // org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache.MessageRef
        public long offset() {
            return this.offset;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache.MessageRef
        public int partition() {
            return this.partition;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache.MessageRef
        public MessageFW message() {
            return CompactedTopicCache.this.messageCache.get(this.messageHandle, CompactedTopicCache.this.messageRO);
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/cache/CompactedTopicCache$MessageIterator.class */
    final class MessageIterator implements Iterator<ImmutableTopicCache.MessageRef> {
        private final Iterator<PartitionIndex.Entry>[] iterators;
        private int partition = -1;
        private final MessageImpl message;
        static final /* synthetic */ boolean $assertionsDisabled;

        MessageIterator(int i) {
            this.message = new MessageImpl();
            this.iterators = new Iterator[i];
        }

        Iterator<ImmutableTopicCache.MessageRef> reset(Long2LongHashMap long2LongHashMap, ListFW<KafkaHeaderFW> listFW) {
            if (!$assertionsDisabled && long2LongHashMap.size() != this.iterators.length) {
                throw new AssertionError();
            }
            for (int i = 0; i < this.iterators.length; i++) {
                this.iterators[i] = CompactedTopicCache.this.indexes[i].entries(long2LongHashMap.get(i), listFW);
            }
            return this;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            boolean z = false;
            this.partition = nextPartition(this.partition);
            for (int i = 0; i < this.iterators.length; i++) {
                z = this.iterators[this.partition].hasNext();
                if (z) {
                    break;
                }
                this.partition = nextPartition(this.partition);
            }
            return z;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ImmutableTopicCache.MessageRef next() {
            return this.message.wrap(this.partition, this.iterators[this.partition].next());
        }

        private int nextPartition(int i) {
            int i2 = i + 1;
            return i2 == this.iterators.length ? 0 : i2;
        }

        static {
            $assertionsDisabled = !CompactedTopicCache.class.desiredAssertionStatus();
        }
    }

    public CompactedTopicCache(int i, int i2, MessageCache messageCache, LongSupplier longSupplier, LongSupplier longSupplier2) {
        this.messageCache = messageCache;
        this.indexes = new CompactedPartitionIndex[i];
        for (int i3 = 0; i3 < i; i3++) {
            this.indexes[i3] = new CompactedPartitionIndex(1000, i2, messageCache, longSupplier, longSupplier2);
        }
        this.messageIterator = new MessageIterator(i);
    }

    CompactedTopicCache(PartitionIndex[] partitionIndexArr, MessageCache messageCache) {
        this.indexes = partitionIndexArr;
        this.messageCache = messageCache;
        this.messageIterator = new MessageIterator(partitionIndexArr.length);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.TopicCache
    public void add(int i, long j, long j2, long j3, long j4, DirectBuffer directBuffer, HeadersFW headersFW, DirectBuffer directBuffer2, boolean z) {
        this.indexes[i].add(j, j2, j3, j4, directBuffer, headersFW, directBuffer2, z);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.TopicCache
    public boolean compacted() {
        return true;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache
    public Iterator<ImmutableTopicCache.MessageRef> getMessages(Long2LongHashMap long2LongHashMap, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW) {
        return octetsFW != null ? this.keyedMessageIterator.reset(long2LongHashMap, octetsFW, listFW) : this.messageIterator.reset(long2LongHashMap, listFW);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache
    public ImmutableTopicCache.MessageRef getMessage(int i, long j) {
        PartitionIndex.Entry next = this.indexes[i].entries(j, null).next();
        return next.offset() == j ? this.message.wrap(i, next) : this.message.wrap(i, j, -1);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.TopicCache
    public void extendNextOffset(int i, long j, long j2) {
        this.indexes[i].extendNextOffset(j, j2);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.TopicCache
    public long getOffset(int i, OctetsFW octetsFW) {
        return this.indexes[i].getOffset(octetsFW);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.TopicCache
    public long nextOffset(int i) {
        return this.indexes[i].nextOffset();
    }

    @Override // org.reaktivity.nukleus.kafka.internal.cache.TopicCache
    public void startOffset(int i, long j) {
        this.indexes[i].startOffset(j);
    }
}
