package org.reaktivity.nukleus.kafka.internal.stream;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.jmock.Expectations;
import org.jmock.integration.junit4.JUnitRuleMockery;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex;
import org.reaktivity.nukleus.kafka.internal.test.TestUtil;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/TopicMessageDispatcherTest.class */
public final class TopicMessageDispatcherTest {
    private Iterator<KafkaHeaderFW> emptyHeaders = Collections.emptyIterator();
    private final ListFW.Builder<KafkaHeaderFW.Builder, KafkaHeaderFW> headersRW = new ListFW.Builder<>(new KafkaHeaderFW.Builder(), new KafkaHeaderFW());
    private final MutableDirectBuffer headersBuffer = new UnsafeBuffer(new byte[1000]);

    @Rule
    public JUnitRuleMockery context = new JUnitRuleMockery();
    private PartitionIndex partitionIndex1 = (PartitionIndex) this.context.mock(PartitionIndex.class, "partitionIndex1");
    private PartitionIndex partitionIndex2 = (PartitionIndex) this.context.mock(PartitionIndex.class, "partitionIndex2");
    PartitionIndex[] partitionIndexes = {this.partitionIndex1, this.partitionIndex2};
    private TopicMessageDispatcher dispatcher = new TopicMessageDispatcher(this.partitionIndexes, false, HeaderValueMessageDispatcher::new);

    @Test
    public void shouldAddDispatcherWithEmptyHeadersAndNullKey() {
        this.dispatcher.add((OctetsFW) null, -1, this.emptyHeaders, (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1"));
    }

    @Test
    public void shouldAddDispatcherWithNullHeadersAndNullKey() {
        this.dispatcher.add((OctetsFW) null, -1, this.emptyHeaders, (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1"));
    }

    @Test
    public void shouldAddMultipleDispatchersWithSameKey() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        this.dispatcher.add(TestUtil.asOctets("key1"), 0, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 0, this.emptyHeaders, messageDispatcher2);
    }

    @Test
    public void shouldAdjustOffset() {
        final MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        final MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher2);
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.1
            {
                ((MessageDispatcher) oneOf(messageDispatcher)).adjustOffset(1, 10L, 5L);
                ((MessageDispatcher) oneOf(messageDispatcher2)).adjustOffset(1, 10L, 5L);
            }
        });
        this.dispatcher.adjustOffset(1, 10L, 5L);
    }

    @Test
    public void shouldDetach() {
        final MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        final MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher2);
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.2
            {
                ((MessageDispatcher) oneOf(messageDispatcher)).detach(true);
                ((MessageDispatcher) oneOf(messageDispatcher2)).detach(true);
            }
        });
        this.dispatcher.detach(true);
    }

    @Test
    public void shouldDispatchWithoutKey() {
        final MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        final MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        this.dispatcher.add((OctetsFW) null, -1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add((OctetsFW) null, -1, this.emptyHeaders, messageDispatcher2);
        final HeadersFW emptyHeaders = emptyHeaders();
        final long currentTimeMillis = System.currentTimeMillis() - 123;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.3
            {
                ((MessageDispatcher) oneOf(messageDispatcher)).dispatch(with(1), with(10L), with(12L), (DirectBuffer) with((DirectBuffer) null), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null));
                will(returnValue(7));
                ((MessageDispatcher) oneOf(messageDispatcher2)).dispatch(with(1), with(10L), with(12L), (DirectBuffer) with((DirectBuffer) null), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null));
                will(returnValue(7));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).add(with(10L), with(12L), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null), (HeadersFW) with(emptyHeaders), (DirectBuffer) with((DirectBuffer) null), with(false));
            }
        });
        Assert.assertEquals(7L, this.dispatcher.dispatch(1, 10L, 12L, 999L, (DirectBuffer) null, emptyHeaders, currentTimeMillis, 0L, (DirectBuffer) null));
    }

    @Test
    public void shouldDispatchWithMatchingKey() {
        final MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        final MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        final MessageDispatcher messageDispatcher3 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child3");
        this.dispatcher.add(TestUtil.asOctets("key1"), 0, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 0, this.emptyHeaders, messageDispatcher2);
        this.dispatcher.add(TestUtil.asOctets("key2"), 1, this.emptyHeaders, messageDispatcher3);
        final HeadersFW emptyHeaders = emptyHeaders();
        final long currentTimeMillis = System.currentTimeMillis() - 123;
        final long currentTimeMillis2 = System.currentTimeMillis() - 123;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.4
            {
                ((MessageDispatcher) oneOf(messageDispatcher)).dispatch(with(0), with(10L), with(12L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key1")), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null));
                will(returnValue(7));
                ((MessageDispatcher) oneOf(messageDispatcher2)).dispatch(with(0), with(10L), with(12L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key1")), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex1)).getEntry(with(10L), (OctetsFW) with(TestUtil.asOctets("key1")));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex1)).nextOffset();
                will(returnValue(0L));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex1)).add(with(10L), with(12L), with(currentTimeMillis), with(0L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key1")), (HeadersFW) with(emptyHeaders), (DirectBuffer) with((DirectBuffer) null), with(false));
                ((MessageDispatcher) oneOf(messageDispatcher3)).dispatch(with(1), with(10L), with(13L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis2), with(0L), (DirectBuffer) with((DirectBuffer) null));
                will(returnValue(7));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).getEntry(with(10L), (OctetsFW) with(TestUtil.asOctets("key2")));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).nextOffset();
                will(returnValue(0L));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).add(with(10L), with(13L), with(currentTimeMillis), with(0L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (HeadersFW) with(emptyHeaders), (DirectBuffer) with((DirectBuffer) null), with(false));
            }
        });
        Assert.assertEquals(7L, this.dispatcher.dispatch(0, 10L, 12L, 999L, TestUtil.asBuffer("key1"), emptyHeaders, currentTimeMillis, 0L, (DirectBuffer) null));
        Assert.assertEquals(7L, this.dispatcher.dispatch(1, 10L, 13L, 999L, TestUtil.asBuffer("key2"), emptyHeaders, currentTimeMillis2, 0L, (DirectBuffer) null));
    }

    @Test
    public void shouldCacheOffsetButNotDispatchNonMatchingKey() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        final MessageDispatcher messageDispatcher3 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "routeMatcher");
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher2);
        this.dispatcher.add((OctetsFW) null, 1, this.emptyHeaders, messageDispatcher3);
        final HeadersFW emptyHeaders = emptyHeaders();
        final long currentTimeMillis = System.currentTimeMillis() - 123;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.5
            {
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).getEntry(with(10L), (OctetsFW) with(TestUtil.asOctets("key2")));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).nextOffset();
                will(returnValue(0L));
                ((MessageDispatcher) oneOf(messageDispatcher3)).dispatch(with(1), with(10L), with(12L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null));
                will(returnValue(1));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).add(with(10L), with(12L), with(currentTimeMillis), with(0L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (HeadersFW) with(emptyHeaders), (DirectBuffer) with((DirectBuffer) null), with(false));
            }
        });
        Assert.assertEquals(1L, this.dispatcher.dispatch(1, 10L, 12L, 999L, TestUtil.asBuffer("key2"), emptyHeaders, currentTimeMillis, 0L, (DirectBuffer) null));
    }

    @Test
    public void shouldCacheOffsetAndMessageWhenProactiveMessageCachingEnabled() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        final MessageDispatcher messageDispatcher3 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "routeMatcher");
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher2);
        this.dispatcher.add((OctetsFW) null, 1, this.emptyHeaders, messageDispatcher3);
        final HeadersFW emptyHeaders = emptyHeaders();
        final long currentTimeMillis = System.currentTimeMillis() - 123;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.6
            {
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).getEntry(with(10L), (OctetsFW) with(TestUtil.asOctets("key2")));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).nextOffset();
                will(returnValue(0L));
                ((MessageDispatcher) oneOf(messageDispatcher3)).dispatch(with(1), with(10L), with(12L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null));
                will(returnValue(1));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).add(with(10L), with(12L), with(currentTimeMillis), with(0L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (HeadersFW) with(emptyHeaders), (DirectBuffer) with((DirectBuffer) null), with(true));
            }
        });
        this.dispatcher.enableProactiveMessageCaching();
        Assert.assertEquals(1L, this.dispatcher.dispatch(1, 10L, 12L, 999L, TestUtil.asBuffer("key2"), emptyHeaders, currentTimeMillis, 0L, (DirectBuffer) null));
    }

    @Test
    public void shouldCacheOffsetAndMessageWhenLCaughtUpToLiveStream() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        final MessageDispatcher messageDispatcher3 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "routeMatcher");
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher2);
        this.dispatcher.add((OctetsFW) null, 1, this.emptyHeaders, messageDispatcher3);
        final HeadersFW emptyHeaders = emptyHeaders();
        final long currentTimeMillis = System.currentTimeMillis() - 123;
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.7
            {
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).getEntry(with(10L), (OctetsFW) with(TestUtil.asOctets("key2")));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).nextOffset();
                will(returnValue(0L));
                ((MessageDispatcher) oneOf(messageDispatcher3)).dispatch(with(1), with(10L), with(11L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (Function) with(emptyHeaders.headerSupplier()), with(currentTimeMillis), with(0L), (DirectBuffer) with((DirectBuffer) null));
                will(returnValue(1));
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).add(with(10L), with(11L), with(currentTimeMillis), with(0L), (DirectBuffer) with(TopicMessageDispatcherTest.this.bufferMatching("key2")), (HeadersFW) with(emptyHeaders), (DirectBuffer) with((DirectBuffer) null), with(true));
            }
        });
        Assert.assertEquals(1L, this.dispatcher.dispatch(1, 10L, 11L, 12L, TestUtil.asBuffer("key2"), emptyHeaders, currentTimeMillis, 0L, (DirectBuffer) null));
    }

    @Test
    public void shouldFlush() {
        final MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        final MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher2);
        this.context.checking(new Expectations() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.8
            {
                ((MessageDispatcher) oneOf(messageDispatcher)).flush(1, 10L, 12L);
                ((MessageDispatcher) oneOf(messageDispatcher2)).flush(1, 10L, 12L);
                ((PartitionIndex) oneOf(TopicMessageDispatcherTest.this.partitionIndex2)).extendNextOffset(10L, 12L);
            }
        });
        this.dispatcher.flush(1, 10L, 12L);
    }

    @Test
    public void shouldRemoveKeyDispatchers() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key2"), 0, this.emptyHeaders, messageDispatcher2);
        Assert.assertTrue(this.dispatcher.remove(TestUtil.asOctets("key1"), 1, this.emptyHeaders, messageDispatcher));
        Assert.assertTrue(this.dispatcher.remove(TestUtil.asOctets("key2"), 0, this.emptyHeaders, messageDispatcher2));
        Assert.assertTrue(this.dispatcher.isEmpty());
    }

    @Test
    public void shouldRemoveHeadersDispatchers() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        ArrayList arrayList = new ArrayList();
        this.headersRW.wrap(this.headersBuffer, 0, this.headersBuffer.capacity()).item(builder -> {
            builder.key("header1").value(TestUtil.asOctets("value1"));
        }).build().forEach(kafkaHeaderFW -> {
            arrayList.add(kafkaHeaderFW);
        });
        this.dispatcher.add((OctetsFW) null, -1, arrayList.iterator(), messageDispatcher);
        this.dispatcher.add(TestUtil.asOctets("key1"), 1, arrayList.iterator(), messageDispatcher2);
        Assert.assertTrue(this.dispatcher.remove((OctetsFW) null, -1, arrayList.iterator(), messageDispatcher));
        Assert.assertTrue(this.dispatcher.remove(TestUtil.asOctets("key1"), 1, arrayList.iterator(), messageDispatcher2));
        Assert.assertTrue(this.dispatcher.isEmpty());
    }

    @Test
    public void shouldRemoveUnconditionalDispatchers() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        this.dispatcher.add((OctetsFW) null, -1, this.emptyHeaders, messageDispatcher);
        this.dispatcher.add((OctetsFW) null, -1, this.emptyHeaders, messageDispatcher2);
        Assert.assertTrue(this.dispatcher.remove((OctetsFW) null, -1, this.emptyHeaders, messageDispatcher));
        Assert.assertTrue(this.dispatcher.remove((OctetsFW) null, -1, this.emptyHeaders, messageDispatcher2));
        Assert.assertTrue(this.dispatcher.isEmpty());
    }

    @Test
    public void shouldNotRemoveDispatcherWhenNotPresent() {
        MessageDispatcher messageDispatcher = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child1");
        MessageDispatcher messageDispatcher2 = (MessageDispatcher) this.context.mock(MessageDispatcher.class, "child2");
        Assert.assertFalse(this.dispatcher.remove(TestUtil.asOctets("key1"), 0, (Iterator) null, messageDispatcher));
        this.dispatcher.add(TestUtil.asOctets("key1"), 0, this.emptyHeaders, messageDispatcher);
        Assert.assertFalse(this.dispatcher.remove(TestUtil.asOctets("key1"), 0, this.emptyHeaders, messageDispatcher2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Matcher<DirectBuffer> bufferMatching(final String str) {
        return new BaseMatcher<DirectBuffer>() { // from class: org.reaktivity.nukleus.kafka.internal.stream.TopicMessageDispatcherTest.9
            public boolean matches(Object obj) {
                return (obj instanceof UnsafeBuffer) && ((UnsafeBuffer) obj).equals(new UnsafeBuffer(str.getBytes(StandardCharsets.UTF_8)));
            }

            public void describeTo(Description description) {
                description.appendText(str);
            }
        };
    }

    private HeadersFW emptyHeaders() {
        HeadersFW headersFW = new HeadersFW();
        headersFW.wrap(new UnsafeBuffer(new byte[]{0}), 0, 1);
        return headersFW;
    }
}
