package org.reaktivity.nukleus.kafka.internal.cache;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.jmock.Expectations;
import org.jmock.integration.junit4.JUnitRuleMockery;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndex;
import org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex;
import org.reaktivity.nukleus.kafka.internal.stream.HeadersFW;
import org.reaktivity.nukleus.kafka.internal.test.TestUtil;
import org.reaktivity.nukleus.kafka.internal.types.MessageFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/cache/CompactedPartitionIndexTest.class */
public final class CompactedPartitionIndexTest {
    private static final int TOMBSTONE_LIFETIME_MILLIS = 5;
    static final /* synthetic */ boolean $assertionsDisabled;
    private DirectBuffer key = TestUtil.asBuffer("key");
    private DirectBuffer headersBuffer = new UnsafeBuffer(new byte[0]);
    private HeadersFW headers = new HeadersFW().wrap(this.headersBuffer, 0, 0);
    private DirectBuffer value = TestUtil.asBuffer("value");
    private final MessageFW messageRO = new MessageFW();

    @Rule
    public JUnitRuleMockery context = new JUnitRuleMockery() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.1
        {
            CompactedPartitionIndexTest.this.messageCache = (MessageCache) mock(MessageCache.class);
        }
    };
    private MessageCache messageCache;
    private PartitionIndex index = new CompactedPartitionIndex(TOMBSTONE_LIFETIME_MILLIS, TOMBSTONE_LIFETIME_MILLIS, this.messageCache);

    @Test
    public void shouldAddAndCacheNewMessage() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.2
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, CompactedPartitionIndexTest.this.key, CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
            }
        });
        this.index.add(0L, 1L, 123L, 456L, this.key, this.headers, this.value, true);
    }

    @Test
    public void shouldAddAndNotCacheNewMessage() {
        this.index.add(0L, 1L, 123L, 456L, this.key, this.headers, this.value, false);
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex, long] */
    @Test
    public void shouldCompactWhenTooManyInvalidEntries() {
        long j = 0;
        for (int i = 0; i <= 10001; i++) {
            long j2 = j;
            j = j2 + 1;
            this.index.add(0L, j2, 123L, 456L, this.key, this.headers, this.value, false);
        }
        Assert.assertEquals(10002L, this.index.numberOfEntries());
        ?? r0 = this.index;
        long j3 = j;
        long j4 = j3 + 1;
        r0.add((long) r0, j3, 123L, 456L, this.key, this.headers, this.value, false);
        Assert.assertEquals(2L, this.index.numberOfEntries());
    }

    @Test
    public void shouldAddTombstoneMessageAndReportUntilTombstoneExpires() throws Exception {
        final long currentTimeMillis = System.currentTimeMillis() + 500;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.3
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(currentTimeMillis, 456L, CompactedPartitionIndexTest.this.key, CompactedPartitionIndexTest.this.headers, (DirectBuffer) null);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).release(0);
            }
        });
        this.index.add(0L, 1L, currentTimeMillis, 456L, this.key, this.headers, (DirectBuffer) null, true);
        Iterator entries = this.index.entries(0L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(1L, ((PartitionIndex.Entry) entries.next()).offset());
        Thread.sleep((currentTimeMillis - System.currentTimeMillis()) + 5);
        Assert.assertEquals(2L, ((PartitionIndex.Entry) this.index.entries(0L).next()).offset());
    }

    @Test
    public void shouldAddTombstonesForExistingMessagesAndReportUntilExpired() throws Exception {
        final long currentTimeMillis = System.currentTimeMillis() + 500;
        final long j = currentTimeMillis + 500;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.4
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(currentTimeMillis, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(currentTimeMillis, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(0, currentTimeMillis, 458L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, (DirectBuffer) null);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).release(0);
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(1, j, 459L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, (DirectBuffer) null);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).release(1);
            }
        });
        this.index.add(0L, 0L, currentTimeMillis, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 1L, currentTimeMillis, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.add(0L, 2L, currentTimeMillis, 458L, TestUtil.asBuffer("key1"), this.headers, (DirectBuffer) null, true);
        if (!$assertionsDisabled && System.currentTimeMillis() >= currentTimeMillis) {
            throw new AssertionError("test failed due to unexpected execution delay");
        }
        Iterator entries = this.index.entries(0L);
        Assert.assertEquals(1L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(2L, ((PartitionIndex.Entry) entries.next()).offset());
        Thread.sleep((currentTimeMillis + 5) - System.currentTimeMillis());
        this.index.add(2L, 3L, j, 459L, TestUtil.asBuffer("key2"), this.headers, (DirectBuffer) null, true);
        if (!$assertionsDisabled && System.currentTimeMillis() >= j) {
            throw new AssertionError("test failed due to unexpected execution delay");
        }
        Iterator entries2 = this.index.entries(0L);
        Assert.assertEquals(3L, ((PartitionIndex.Entry) entries2.next()).offset());
        Assert.assertFalse(entries2.hasNext());
        Thread.sleep((j + 5) - System.currentTimeMillis());
        Iterator entries3 = this.index.entries(0L);
        Assert.assertEquals(4L, ((PartitionIndex.Entry) entries3.next()).offset());
        Assert.assertFalse(entries3.hasNext());
    }

    @Test
    public void shouldAddTombstonesForExistingMessagesAndReportUntilExpiredWhenNotCachingMessages() throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + 500;
        long j = currentTimeMillis + 500;
        this.index.add(0L, 0L, currentTimeMillis, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, false);
        this.index.add(0L, 1L, currentTimeMillis, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, false);
        this.index.add(0L, 2L, currentTimeMillis, 458L, TestUtil.asBuffer("key1"), this.headers, (DirectBuffer) null, false);
        if (!$assertionsDisabled && System.currentTimeMillis() >= currentTimeMillis) {
            throw new AssertionError("test failed due to unexpected execution delay");
        }
        Iterator entries = this.index.entries(0L);
        Assert.assertEquals(1L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(2L, ((PartitionIndex.Entry) entries.next()).offset());
        Thread.sleep((currentTimeMillis + 5) - System.currentTimeMillis());
        this.index.add(2L, 3L, j, 459L, TestUtil.asBuffer("key2"), this.headers, (DirectBuffer) null, false);
        if (!$assertionsDisabled && System.currentTimeMillis() >= j) {
            throw new AssertionError("test failed due to unexpected execution delay");
        }
        Iterator entries2 = this.index.entries(0L);
        Assert.assertEquals(3L, ((PartitionIndex.Entry) entries2.next()).offset());
        Assert.assertFalse(entries2.hasNext());
        Thread.sleep((j + 5) - System.currentTimeMillis());
        Iterator entries3 = this.index.entries(0L);
        Assert.assertEquals(4L, ((PartitionIndex.Entry) entries3.next()).offset());
        Assert.assertFalse(entries3.hasNext());
    }

    @Test
    public void shouldReplaceTombstoneWithNewMessageAndRemoveOnlyWhenASubsequentTombstoneExpires() throws Exception {
        final long currentTimeMillis = System.currentTimeMillis() + 500;
        final long j = currentTimeMillis + 500;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.5
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(currentTimeMillis, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(0, currentTimeMillis, 457L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, (DirectBuffer) null);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(0, currentTimeMillis, 458L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(0, j, 459L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, (DirectBuffer) null);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).release(0);
            }
        });
        this.index.add(0L, 0L, currentTimeMillis, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(1L, 1L, currentTimeMillis, 457L, TestUtil.asBuffer("key1"), this.headers, (DirectBuffer) null, true);
        this.index.add(2L, 2L, currentTimeMillis, 458L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        if (!$assertionsDisabled && System.currentTimeMillis() >= currentTimeMillis) {
            throw new AssertionError("test failed due to unexpected execution delay");
        }
        Iterator entries = this.index.entries(0L);
        Assert.assertEquals(2L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(0L, r0.message());
        Assert.assertFalse(entries.hasNext());
        Thread.sleep((currentTimeMillis + 5) - System.currentTimeMillis());
        Iterator entries2 = this.index.entries(0L);
        PartitionIndex.Entry entry = (PartitionIndex.Entry) entries2.next();
        Assert.assertEquals(0L, entry.message());
        Assert.assertEquals(2L, entry.offset());
        Assert.assertFalse(entries2.hasNext());
        this.index.add(3L, 3L, j, 459L, TestUtil.asBuffer("key1"), this.headers, (DirectBuffer) null, true);
        Thread.sleep((j + 5) - System.currentTimeMillis());
        Iterator entries3 = this.index.entries(0L);
        Assert.assertEquals(4L, ((PartitionIndex.Entry) entries3.next()).offset());
        Assert.assertEquals(-1L, r0.message());
        Assert.assertFalse(entries3.hasNext());
    }

    @Test
    public void shouldNotAddToEntriesIteratorMessagesWithOffsetsOutOfOrder() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.6
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(125L, 458L, TestUtil.asBuffer("key3"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(2));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(126L, 459L, TestUtil.asBuffer("key4"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(3));
            }
        });
        this.index.add(101L, 110L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 100L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.add(102L, 110L, 125L, 458L, TestUtil.asBuffer("key3"), this.headers, this.value, true);
        this.index.add(101L, 101L, 126L, 459L, TestUtil.asBuffer("key4"), this.headers, this.value, true);
        Iterator entries = this.index.entries(100L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(100L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(1L, r0.message());
        Assert.assertEquals(101L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(3L, r0.message());
        Assert.assertFalse(entries.hasNext());
    }

    @Test
    public void shouldGetEntriesForExistingKeys() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.7
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
            }
        });
        this.index.add(0L, 0L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 1L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        Assert.assertEquals(0L, this.index.getEntry(0L, TestUtil.asOctets("key1")).offset());
        Assert.assertEquals(0L, r0.message());
        Assert.assertEquals(1L, this.index.getEntry(0L, TestUtil.asOctets("key2")).offset());
        Assert.assertEquals(1L, r0.message());
    }

    @Test
    public void shouldHighestVisitedOffsetForNonExistingKey() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.8
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
            }
        });
        this.index.add(0L, 1L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 2L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        Assert.assertEquals(3L, this.index.getEntry(1L, TestUtil.asOctets("unknownKey")).offset());
        Assert.assertEquals(-1L, r0.message());
    }

    @Test
    public void shouldIterate() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.9
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
            }
        });
        this.index.add(0L, 0L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 1L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        Iterator entries = this.index.entries(0L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(0L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(0L, r0.message());
        Assert.assertEquals(1L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(1L, r0.message());
        Assert.assertFalse(entries.hasNext());
    }

    @Test(expected = IndexOutOfBoundsException.class)
    public void shouldThrowExceptionFromIteratorWhenNoMoreElements() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.10
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
            }
        });
        this.index.add(0L, 1L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        Iterator entries = this.index.entries(0L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertNotNull(entries.next());
        Assert.assertFalse(entries.hasNext());
        entries.next();
    }

    @Test
    public void shouldIterateFromNextHighestOffsetWhenRequestedOffsetNotFound() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.11
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(125L, 458L, TestUtil.asBuffer("key3"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(1, 126L, 459L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
            }
        });
        this.index.add(0L, 0L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 1L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.add(0L, 2L, 125L, 458L, TestUtil.asBuffer("key3"), this.headers, this.value, true);
        this.index.add(0L, 10L, 126L, 459L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        Iterator entries = this.index.entries(5L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(10L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(1L, r0.message());
        Assert.assertFalse(entries.hasNext());
    }

    @Test
    public void shouldReturnIteratorWithRequestedOffsetAndNoMessageWhenCacheIsEmpty() {
        Iterator entries = this.index.entries(100L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(100L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(-1L, r0.message());
        Assert.assertFalse(entries.hasNext());
    }

    @Test(expected = NoSuchElementException.class)
    public void shouldThrowExceptionFromNoMessageIteratorNextWhenNoMoreElements() {
        Iterator entries = this.index.entries(102L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(102L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(-1L, r0.message());
        Assert.assertFalse(entries.hasNext());
        entries.next();
    }

    @Test
    public void shouldReturnIteratorWithRequestedOffsetAndNoMessageWhenRequestOffsetExceedsHighestOffsetSeenSoFar() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.12
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
            }
        });
        this.index.add(0L, 101L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        Iterator entries = this.index.entries(102L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(102L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(-1L, r0.message());
        Assert.assertFalse(entries.hasNext());
    }

    @Test
    public void shouldCacheHistoricalMessageWhenItHasBeenEvicted() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.13
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(125L, 458L, TestUtil.asBuffer("key3"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(2));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).get(with(1), (MessageFW) with(any(MessageFW.class)));
                will(returnValue(null));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(1, 124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
            }
        });
        this.index.add(0L, 101L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(100L, 102L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.add(100L, 103L, 125L, 458L, TestUtil.asBuffer("key3"), this.headers, this.value, true);
        this.index.add(100L, 102L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
    }

    @Test
    public void shouldNotCacheHistoricalMessageWhenItHasNotBeenEvicted() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.14
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(125L, 458L, TestUtil.asBuffer("key3"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(2));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).get(with(1), (MessageFW) with(any(MessageFW.class)));
                will(returnValue(CompactedPartitionIndexTest.this.messageRO));
            }
        });
        this.index.add(0L, 101L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(100L, 102L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.add(100L, 103L, 125L, 458L, TestUtil.asBuffer("key3"), this.headers, this.value, true);
        this.index.add(100L, 102L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
    }

    @Test
    public void shouldNotUpdateEntryForExistingKeyWhenAddHasRequestOffsetExceedsHighestOffsetSeenSoFar() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.15
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
            }
        });
        this.index.add(0L, 110L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(112L, 113L, 124L, 457L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
    }

    @Test
    public void shouldAddEntryForKeyButNotIncreaseHighwaterMarkWhenRequestOffsetExceedsHighestOffsetSeenSoFar() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.16
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
            }
        });
        this.index.add(0L, 110L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(112L, 113L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
    }

    @Test
    public void shouldReplaceOffsetsForKeys() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.17
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(1, 125L, 458L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).replace(0, 126L, 459L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
            }
        });
        this.index.add(0L, 0L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 1L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.add(0L, 2L, 125L, 458L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.add(0L, 3L, 126L, 459L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        Iterator entries = this.index.entries(0L);
        Assert.assertTrue(entries.hasNext());
        Assert.assertEquals(2L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(1L, r0.message());
        Assert.assertEquals(3L, ((PartitionIndex.Entry) entries.next()).offset());
        Assert.assertEquals(0L, r0.message());
        Assert.assertFalse(entries.hasNext());
    }

    @Test
    public void shouldReportEntryAsString() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.18
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(77));
            }
        });
        this.index.add(0L, 100L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        String obj = ((PartitionIndex.Entry) this.index.entries(0L).next()).toString();
        Assert.assertTrue(obj.contains("100"));
        Assert.assertTrue(obj.contains("77"));
    }

    @Test
    public void shouldMaintainIsTombstoneStateOnEntryImpl() {
        CompactedPartitionIndex.EntryImpl entryImpl = new CompactedPartitionIndex.EntryImpl(123L, -1, 0);
        Assert.assertFalse(entryImpl.isTombstone());
        Assert.assertEquals(0L, entryImpl.position());
        Assert.assertFalse(entryImpl.getAndSetIsTombstone(true));
        Assert.assertTrue(entryImpl.isTombstone());
        Assert.assertEquals(0L, entryImpl.position());
        Assert.assertTrue(entryImpl.getAndSetIsTombstone(false));
        Assert.assertFalse(entryImpl.isTombstone());
        Assert.assertFalse(entryImpl.getAndSetIsTombstone(false));
        Assert.assertFalse(entryImpl.isTombstone());
        Assert.assertEquals(0L, entryImpl.position());
        entryImpl.setPosition(1);
        Assert.assertFalse(entryImpl.isTombstone());
        Assert.assertFalse(entryImpl.getAndSetIsTombstone(true));
        Assert.assertTrue(entryImpl.isTombstone());
        Assert.assertEquals(1L, entryImpl.position());
        entryImpl.setPosition(2);
        Assert.assertTrue(entryImpl.isTombstone());
    }

    @Test
    public void shouldEvictMessagesBeforeStartOffset() {
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndexTest.19
            {
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(123L, 456L, TestUtil.asBuffer("key1"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(0));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).put(124L, 457L, TestUtil.asBuffer("key2"), CompactedPartitionIndexTest.this.headers, CompactedPartitionIndexTest.this.value);
                will(returnValue(1));
                ((MessageCache) oneOf(CompactedPartitionIndexTest.this.messageCache)).release(0);
            }
        });
        this.index.add(0L, 0L, 123L, 456L, TestUtil.asBuffer("key1"), this.headers, this.value, true);
        this.index.add(0L, 1L, 124L, 457L, TestUtil.asBuffer("key2"), this.headers, this.value, true);
        this.index.startOffset(1L);
        PartitionIndex.Entry entry = this.index.getEntry(0L, TestUtil.asOctets("key1"));
        PartitionIndex.Entry entry2 = this.index.getEntry(0L, TestUtil.asOctets("key2"));
        Assert.assertEquals(2L, entry.offset());
        Assert.assertEquals(-1L, entry.message());
        Assert.assertEquals(1L, entry2.offset());
        Assert.assertEquals(1L, entry2.message());
    }

    static {
        $assertionsDisabled = !CompactedPartitionIndexTest.class.desiredAssertionStatus();
    }
}
