package org.reaktivity.nukleus.kafka.internal.cache;

import java.util.Iterator;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.jmock.Expectations;
import org.jmock.Sequence;
import org.jmock.integration.junit4.JUnitRuleMockery;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache;
import org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex;
import org.reaktivity.nukleus.kafka.internal.stream.HeadersFW;
import org.reaktivity.nukleus.kafka.internal.test.TestUtil;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.MessageFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/cache/CompactedTopicCacheTest.class */
public final class CompactedTopicCacheTest {
    private Iterator<PartitionIndex.Entry> iterator0;
    private Iterator<PartitionIndex.Entry> iterator1;
    private Iterator<PartitionIndex.Entry> iterator2;
    private PartitionIndex.Entry entry0;
    private PartitionIndex.Entry entry1;
    private PartitionIndex.Entry entry2;
    private DirectBuffer key = TestUtil.asBuffer("key");
    private DirectBuffer headersBuffer = new UnsafeBuffer(new byte[0]);
    private HeadersFW headers = new HeadersFW().wrap(this.headersBuffer, 0, 0);
    private DirectBuffer value = TestUtil.asBuffer("value");
    private MessageFW messageRO = new MessageFW();

    @Rule
    public JUnitRuleMockery context = new JUnitRuleMockery() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedTopicCacheTest.1
        {
            CompactedTopicCacheTest.this.messageCache = (MessageCache) mock(MessageCache.class);
            CompactedTopicCacheTest.this.index0 = (PartitionIndex) mock(PartitionIndex.class, "index0");
            CompactedTopicCacheTest.this.index1 = (PartitionIndex) mock(PartitionIndex.class, "index1");
            CompactedTopicCacheTest.this.index2 = (PartitionIndex) mock(PartitionIndex.class, "index2");
            CompactedTopicCacheTest.this.iterator0 = (Iterator) mock(Iterator.class, "iterator0");
            CompactedTopicCacheTest.this.iterator1 = (Iterator) mock(Iterator.class, "iterator1");
            CompactedTopicCacheTest.this.iterator2 = (Iterator) mock(Iterator.class, "iterator2");
            CompactedTopicCacheTest.this.entry0 = (PartitionIndex.Entry) mock(PartitionIndex.Entry.class, "entry0");
            CompactedTopicCacheTest.this.entry1 = (PartitionIndex.Entry) mock(PartitionIndex.Entry.class, "entry1");
            CompactedTopicCacheTest.this.entry2 = (PartitionIndex.Entry) mock(PartitionIndex.Entry.class, "entry2");
        }
    };
    private Sequence order = this.context.sequence("order");
    private PartitionIndex index0;
    private PartitionIndex index1;
    private PartitionIndex index2;
    private MessageCache messageCache;
    private CompactedTopicCache cache = new CompactedTopicCache(new PartitionIndex[]{this.index0, this.index1, this.index2}, this.messageCache);

    @Test
    public void shouldAddAndCacheNewMessage() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedTopicCacheTest.2
            {
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index0)).add(0L, 1L, 123L, 456L, CompactedTopicCacheTest.this.key, CompactedTopicCacheTest.this.headers, CompactedTopicCacheTest.this.value, true);
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index2)).add(1L, 10L, 234L, 456L, CompactedTopicCacheTest.this.key, CompactedTopicCacheTest.this.headers, CompactedTopicCacheTest.this.value, true);
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index1)).add(2L, 3L, 345L, 456L, CompactedTopicCacheTest.this.key, CompactedTopicCacheTest.this.headers, CompactedTopicCacheTest.this.value, true);
            }
        });
        this.cache.add(0, 0L, 1L, 123L, 456L, this.key, this.headers, this.value, true);
        this.cache.add(2, 1L, 10L, 234L, 456L, this.key, this.headers, this.value, true);
        this.cache.add(1, 2L, 3L, 345L, 456L, this.key, this.headers, this.value, true);
    }

    @Test
    public void shouldGetMessagesFromAlternatingPartitions() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedTopicCacheTest.3
            {
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index0)).entries(10L, (ListFW) null);
                will(returnValue(CompactedTopicCacheTest.this.iterator0));
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index1)).entries(11L, (ListFW) null);
                will(returnValue(CompactedTopicCacheTest.this.iterator1));
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index2)).entries(12L, (ListFW) null);
                will(returnValue(CompactedTopicCacheTest.this.iterator2));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).hasNext();
                will(returnValue(true));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry0));
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).messageHandle();
                will(returnValue(111));
                ((MessageCache) oneOf(CompactedTopicCacheTest.this.messageCache)).get(with(111), (MessageFW) with(any(MessageFW.class)));
                will(returnValue(CompactedTopicCacheTest.this.messageRO));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator1)).hasNext();
                will(returnValue(true));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator1)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry1));
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry1)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry1)).messageHandle();
                will(returnValue(-1));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator2)).hasNext();
                will(returnValue(true));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator2)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry2));
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry2)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry2)).messageHandle();
                will(returnValue(-1));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).hasNext();
                will(returnValue(true));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry0));
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).messageHandle();
                will(returnValue(-1));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator1)).hasNext();
                will(returnValue(false));
                inSequence(CompactedTopicCacheTest.this.order);
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator2)).hasNext();
                will(returnValue(false));
                inSequence(CompactedTopicCacheTest.this.order);
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).hasNext();
                will(returnValue(false));
                inSequence(CompactedTopicCacheTest.this.order);
            }
        });
        Long2LongHashMap long2LongHashMap = new Long2LongHashMap(-1L);
        long2LongHashMap.put(0L, 10L);
        long2LongHashMap.put(1L, 11L);
        long2LongHashMap.put(2L, 12L);
        Iterator messages = this.cache.getMessages(long2LongHashMap, (OctetsFW) null, (ListFW) null);
        Assert.assertTrue(messages.hasNext());
        Assert.assertSame(this.messageRO, ((ImmutableTopicCache.MessageRef) messages.next()).message());
        for (int i = 0; i < 3; i++) {
            Assert.assertTrue(messages.hasNext());
            Assert.assertNotNull((ImmutableTopicCache.MessageRef) messages.next());
        }
        Assert.assertFalse(messages.hasNext());
    }

    @Test
    public void shouldRotateStartingPartitionForHasNextForFairness() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedTopicCacheTest.4
            {
                ((PartitionIndex) exactly(4).of(CompactedTopicCacheTest.this.index0)).entries(10L, (ListFW) null);
                will(returnValue(CompactedTopicCacheTest.this.iterator0));
                ((PartitionIndex) exactly(4).of(CompactedTopicCacheTest.this.index1)).entries(11L, (ListFW) null);
                will(returnValue(CompactedTopicCacheTest.this.iterator1));
                ((PartitionIndex) exactly(4).of(CompactedTopicCacheTest.this.index2)).entries(12L, (ListFW) null);
                will(returnValue(CompactedTopicCacheTest.this.iterator2));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).hasNext();
                will(returnValue(true));
                inSequence(CompactedTopicCacheTest.this.order);
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry0));
                inSequence(CompactedTopicCacheTest.this.order);
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).messageHandle();
                will(returnValue(-1));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator1)).hasNext();
                will(returnValue(true));
                inSequence(CompactedTopicCacheTest.this.order);
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator1)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry1));
                inSequence(CompactedTopicCacheTest.this.order);
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry1)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry1)).messageHandle();
                will(returnValue(-1));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator2)).hasNext();
                will(returnValue(true));
                inSequence(CompactedTopicCacheTest.this.order);
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator2)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry2));
                inSequence(CompactedTopicCacheTest.this.order);
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry2)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry2)).messageHandle();
                will(returnValue(-1));
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).hasNext();
                will(returnValue(true));
                inSequence(CompactedTopicCacheTest.this.order);
                ((Iterator) oneOf(CompactedTopicCacheTest.this.iterator0)).next();
                will(returnValue(CompactedTopicCacheTest.this.entry0));
                inSequence(CompactedTopicCacheTest.this.order);
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).offset();
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry0)).messageHandle();
                will(returnValue(-1));
            }
        });
        Long2LongHashMap long2LongHashMap = new Long2LongHashMap(-1L);
        long2LongHashMap.put(0L, 10L);
        long2LongHashMap.put(1L, 11L);
        long2LongHashMap.put(2L, 12L);
        for (int i = 0; i < 4; i++) {
            Iterator messages = this.cache.getMessages(long2LongHashMap, (OctetsFW) null, (ListFW) null);
            Assert.assertTrue(messages.hasNext());
            messages.next();
        }
    }

    @Test
    public void shouldGetMessagesWithKnownKeyMessageNotCached() {
        final OctetsFW asOctets = TestUtil.asOctets("key");
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedTopicCacheTest.5
            {
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index1)).getEntry(asOctets);
                will(returnValue(CompactedTopicCacheTest.this.entry1));
                ((PartitionIndex.Entry) exactly(2).of(CompactedTopicCacheTest.this.entry1)).offset();
                will(returnValue(21L));
                ((PartitionIndex.Entry) oneOf(CompactedTopicCacheTest.this.entry1)).messageHandle();
                will(returnValue(-1));
                ((MessageCache) oneOf(CompactedTopicCacheTest.this.messageCache)).get(with(-1), (MessageFW) with(any(MessageFW.class)));
                will(returnValue(null));
            }
        });
        Long2LongHashMap long2LongHashMap = new Long2LongHashMap(-1L);
        long2LongHashMap.put(1L, 11L);
        Iterator messages = this.cache.getMessages(long2LongHashMap, asOctets, (ListFW) null);
        Assert.assertTrue(messages.hasNext());
        Assert.assertEquals(21L, ((ImmutableTopicCache.MessageRef) messages.next()).offset());
    }

    @Test
    public void shouldGetMessagesWithUnknownKeyAtLiveOffset() {
        final OctetsFW asOctets = TestUtil.asOctets("key");
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedTopicCacheTest.6
            {
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index1)).getEntry(asOctets);
                will(returnValue(null));
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index1)).nextOffset();
                will(returnValue(4L));
            }
        });
        Long2LongHashMap long2LongHashMap = new Long2LongHashMap(-1L);
        long2LongHashMap.put(1L, 3L);
        Iterator messages = this.cache.getMessages(long2LongHashMap, asOctets, (ListFW) null);
        Assert.assertTrue(messages.hasNext());
        Assert.assertEquals(4L, ((ImmutableTopicCache.MessageRef) messages.next()).offset());
    }

    @Test
    public void shouldGetMessagesWithUnknownKeyAtRequestedOffsetWhenHigherThanLiveOffset() {
        final OctetsFW asOctets = TestUtil.asOctets("key");
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedTopicCacheTest.7
            {
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index1)).getEntry(asOctets);
                will(returnValue(null));
                ((PartitionIndex) oneOf(CompactedTopicCacheTest.this.index1)).nextOffset();
                will(returnValue(4L));
            }
        });
        Long2LongHashMap long2LongHashMap = new Long2LongHashMap(-1L);
        long2LongHashMap.put(1L, 11L);
        Iterator messages = this.cache.getMessages(long2LongHashMap, asOctets, (ListFW) null);
        Assert.assertTrue(messages.hasNext());
        Assert.assertEquals(11L, ((ImmutableTopicCache.MessageRef) messages.next()).offset());
    }
}
