package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.ScriptProperty;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.test.KafkaCountersRule;
import org.reaktivity.reaktor.test.ReaktorRule;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/CachingFetchIT.class */
public class CachingFetchIT {
    private final K3poRule k3po = new K3poRule().addScriptRoot("route", "org/reaktivity/specification/nukleus/kafka/control/route.ext").addScriptRoot("routeAnyTopic", "org/reaktivity/specification/nukleus/kafka/control/route").addScriptRoot("control", "org/reaktivity/specification/nukleus/kafka/control").addScriptRoot("server", "org/reaktivity/specification/kafka/fetch.v5").addScriptRoot("metadata", "org/reaktivity/specification/kafka/metadata.v5").addScriptRoot("client", "org/reaktivity/specification/nukleus/kafka/streams/fetch");
    private final TestRule timeout = new DisableOnDebug(new Timeout(10, TimeUnit.SECONDS));
    private final ReaktorRule reaktor;
    private final KafkaCountersRule counters;

    @Rule
    public final TestRule chain;

    public CachingFetchIT() {
        String str = "kafka";
        this.reaktor = new ReaktorRule().nukleus((v1) -> {
            return r2.equals(v1);
        }).directory("target/nukleus-itests").commandBufferCapacity(1024).responseBufferCapacity(1024).counterValuesBufferCapacity(8192).configure(KafkaConfiguration.KAFKA_TOPIC_BOOTSTRAP_ENABLED, false).configure(KafkaConfiguration.KAFKA_MESSAGE_CACHE_CAPACITY, 1048576L).configure(KafkaConfiguration.KAFKA_MESSAGE_CACHE_PROACTIVE, false).configure(KafkaConfiguration.KAFKA_READ_IDLE_TIMEOUT, 86400).affinityMask("target#0", Long.MIN_VALUE).clean();
        this.counters = new KafkaCountersRule(this.reaktor);
        this.chain = RuleChain.outerRule(this.reaktor).around(this.k3po).around(this.counters).around(this.timeout);
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/begin.ext.missing/client"})
    public void shouldRejectWhenBeginExtMissing() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeAnyTopic}/client/controller", "${client}/invalid.topic.name/client", "${metadata}/one.topic.error.invalid.topic/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldRejectWhenTopicNameContainsDisallowedCharacters() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/invalid.fetch.key.and.multiple.offsets/client"})
    public void shouldRejectInvalidBeginExWithFetchKeyAndMultipleOffsets() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/invalid.missing.fetch.key/client"})
    public void shouldRejectInvalidBeginExWithMissingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/invalid.more.than.one.fetch.key.hash/client"})
    public void shouldRejectInvalidBeginExWithMoreThanOneFetchKeyHash() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/topic.name.not.equals.route.ext/client"})
    @ScriptProperty({"routedTopicName \"not_test\""})
    public void shouldRejectTopicNameNotEqualToRoutedTopic() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.historical.empty.message/client", "${server}/compacted.empty.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedEmptyMessageFromCacheWhenSubscribedToKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.header.message.multiple.clients/client", "${server}/compacted.header.first.matches.repeated/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveHistoricalMessageMatchingHeaderFirstFromCache() throws Exception {
        this.k3po.finish();
        Assert.assertEquals(1L, this.counters.cacheHits());
        Assert.assertEquals(1L, this.counters.cacheMisses());
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.header.messages.and.tombstone/client", "${server}/compacted.header.matches.removed.in.subsequent.response/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageAndTombstoneWithHeaderRemovedInSubsequentResponse() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.header.messages.and.tombstone/client", "${server}/compacted.header.matches.then.updated/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageAndTombstoneWithHeaderUpdated() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.historical.uses.cached.key.after.unsubscribe/client", "${server}/compacted.historical.uses.cached.key.after.unsubscribe/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessageUsingCachedKeyAfterAllClientsUnsubscribe() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_UNSUBSCRIBED");
        this.k3po.awaitBarrier("SECOND_LIVE_FETCH_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("CONNECT_CLIENT_TWO");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("DELIVER_SECOND_LIVE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.historical.uses.cached.key.then.latest.offset/client", "${server}/compacted.historical.uses.cached.key.then.latest.offset.no.historical/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessagesWithUncachedKeyUsingLatestOffset() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_THREE_CONNECTED");
        awaitWindowFromClient();
        this.k3po.notifyBarrier("DELIVER_SECOND_LIVE_RESPONSE");
        this.k3po.finish();
        Assert.assertEquals(1L, this.counters.cacheHits());
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.historical.uses.cached.key.then.live/client", "${server}/compacted.historical.uses.cached.key.then.live.no.historical/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessageUsingCachedKeyOffsetThenCatchUpToLiveStream() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_RECEIVED_FIRST_MESSAGE");
        this.k3po.notifyBarrier("CONNECT_CLIENT_TWO");
        this.k3po.awaitBarrier("CLIENT_TWO_RECEIVED_FIRST_MESSAGE");
        this.k3po.notifyBarrier("DELIVER_FINAL_LIVE_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.historical.uses.cached.key.then.zero.offset/client", "${server}/compacted.historical.uses.zero.offset/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessagesWithCachedKeyUsingZeroOffset() throws Exception {
        this.k3po.finish();
        Assert.assertEquals(2L, this.counters.cacheHits());
        Assert.assertEquals(0L, this.counters.cacheMisses());
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.message/client", "${server}/compacted.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessage() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("ROUTED_CLIENT");
        this.k3po.notifyBarrier("CONNECT_CLIENT");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.message.fanout/client", "${server}/compacted.message.delayed.describe.response/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessageWithMessageKeyAndLastOffset() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("DESCRIBE_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("CONNECT_CLIENT_TWO");
        this.k3po.notifyBarrier("DELIVER_DESCRIBE_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.messages.header/client", "${server}/compacted.messages.header/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessagesFilteredByHeaderOnBegin() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.messages.one.per.key/client", "${server}/compacted.messages.live/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedHistoricalMessagesFromCacheWhenOriginallyReceivedAsLiveMessages() throws Exception {
        this.k3po.finish();
        Assert.assertEquals(2L, this.counters.cacheHits());
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/compacted.messages.multiple.nodes/client", "${server}/compacted.messages.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveCompactedMessagesFromMultipleNodes() throws Exception {
        this.k3po.finish();
    }

    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${control}/route.ext.multiple.topics/client/controller", "${client}/compacted.message.multiple.topics/client", "${server}/compacted.message.multiple.topics/server"})
    @Ignore("May fail due to unpredicable read ordering of window frames from clients and first fetch response")
    public void shouldReceiveMessagesFromMultipleTopics() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.awaitBarrier("CLIENT_THREE_CONNECTED");
        this.k3po.awaitBarrier("ALL_METADATA_RESPONSES_WRITTEN");
        this.k3po.notifyBarrier("WRITE_FIRST_FETCH_RESPONSE");
        this.k3po.notifyBarrier("WRITE_SECOND_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/distinct.offset.messagesets.fanout/client", "${server}/distinct.offset.messagesets.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldFanoutMessageSetsAtDistinctOffsets() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.notifyBarrier("SERVER_DELIVER_HISTORICAL_RESPONSE");
        this.k3po.awaitBarrier("CLIENT_ONE_RECEIVED_THIRD_MESSAGE");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_THREE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/distinct.offset.messagesets.fanout/client", "${server}/distinct.offset.messagesets.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldFanoutMessageSetsAtDistinctOffsetsWithoutDeliveringLiveMessageOutOfOrder() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_THREE");
        this.k3po.awaitBarrier("CLIENT_ONE_RECEIVED_THIRD_MESSAGE");
        this.k3po.notifyBarrier("SERVER_DELIVER_HISTORICAL_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/distinct.offsets.message.fanout/client", "${server}/distinct.offsets.message.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldHandleParallelSubscribesAtDistinctOffsets() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.awaitBarrier("SERVER_LIVE_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("CONNECT_CLIENT_TWO");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        awaitWindowFromClient();
        this.k3po.notifyBarrier("SERVER_DELIVER_LIVE_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fanout.with.historical.message/client", "${server}/fanout.with.historical.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldFanoutUsingHistoricalConnection() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fanout.with.historical.messages/client", "${server}/fanout.with.historical.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldFanoutDiscardingHistoricalMessageToJoinLiveStream() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fanout.with.slow.consumer/client", "${server}/fanout.with.slow.consumer/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldFanoutWithSlowConsumer() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_TWO_RECEIVED_SECOND_MESSAGE");
        awaitWindowFromClient();
        this.k3po.notifyBarrier("SERVER_DELIVER_LIVE_RESPONSE_TWO");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.and.hash.code.picks.partition.zero/client", "${server}/fetch.key.and.hash.code.picks.partition.zero/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageUsingFetchKeyAndExplicitHashCode() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.and.no.key.messages/client", "${server}/fetch.key.and.no.key.multiple.partitions/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageOnSubscribesWithAndWithoutKeyFromMultiplePartitions() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.and.no.key.unsubscribe/client", "${server}/fetch.key.and.no.key.multiple.partitions.unsubscribe/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldNotGiveAssertionErrorWhenUnsubscribeLeavingConsumerOnAnotherPartition() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_TWO_HAS_WRITTEN_RESET");
        this.k3po.notifyBarrier("CLIENT_TWO_UNSUBSCRIBED");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.default.partitioner.picks.partition.one/client", "${server}/fetch.key.default.partitioner.picks.partition.one/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageUsingFetchKeyAndDefaultPartitioner() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.historical.does.not.use.cached.key/client", "${server}/fetch.key.historical.does.not.use.cached.key/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageFromNonCompactedTopicWithoutCachingKeyOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.messages/client", "${server}/fetch.key.multiple.matches.flow.controlled/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\"", "applicationConnectWindow 15"})
    public void shouldReceiveMultipleMessagesMatchingFetchKeyFlowControlled() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.message/client", "${server}/fetch.key.multiple.record.batches.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingFetchKeyWithLastOffsetWithMultipleRecordBatches() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.no.messages/client", "${server}/fetch.key.multiple.record.batches.no.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveNoMessagesMatchingFetchKeyWithMultipleRecordBatches() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.no.messages/client", "${server}/fetch.key.no.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveNoMessagesMatchingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.three.messages/client", "${server}/fetch.key.three.matches.flow.controlled/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\"", "applicationConnectWindow 15"})
    public void shouldReceiveMessagesThreeMatchingFetchKeyFlowControlled() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.nonzero.offset.message/client", "${server}/fetch.key.nonzero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingFetchKeyFirstNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.nonzero.offset.too.low.message/client", "${server}/fetch.key.nonzero.offset.too.low.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldUseEarliestAvailableOffsetIfGreaterThanRequestedOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.message/client", "${server}/fetch.key.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingFetchKeyFirst() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.unspecified.offset.message/client", "${server}/fetch.key.high.water.mark.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveLiveMessageMatchingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.message/client", "${server}/fetch.key.zero.offset.last.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingFetchKeyLast() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.messages/client", "${server}/fetch.key.zero.offset.multiple.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMultipleMessagesMatchingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.messages.historical/client", "${server}/fetch.key.zero.offset.multiple.matches.historical/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMultipleHistoricalMessagesMatchingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.and.fetch.key.zero.offset.message/client", "${server}/header.and.fetch.key.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingFetchKeyAndHeader() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.zero.offset.message/client", "${server}/header.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingHeaderFirst() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.zero.offset.message/client", "${server}/header.zero.offset.last.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingHeaderLast() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.zero.offset.messages/client", "${server}/header.zero.offset.multiple.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMultipleMessagesMatchingHeader() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/headers.and.fetch.key.zero.offset.message/client", "${server}/headers.and.fetch.key.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageMatchingFetchKeyAndHeaders() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/headers.zero.offset.messages.historical/client", "${server}/headers.zero.offset.multiple.matches.historical/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveHistoricalMessagesMatchingHeaders() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/nonzero.offset/client", "${server}/nonzero.offset/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldRequestMessagesAtNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/nonzero.offset.message/client", "${server}/nonzero.offset.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageAtNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/nonzero.offset.messages/client", "${server}/nonzero.offset.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessagesAtNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/offset.too.low.message/client", "${server}/offset.too.low.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldRefetchUsingReportedFirstOffset() throws Exception {
        this.k3po.start();
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeAnyTopic}/client/controller", "${client}/offset.too.low.multiple.nodes/client", "${server}/offset.too.low.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldRefetchUsingReportedFirstOffsetOnMultipleNodes() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeAnyTopic}/client/controller", "${client}/offset.too.low.multiple.topics/client", "${server}/offset.too.low.multiple.topics/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldRefetchUsingReportedFirstOffsetOnMultipleTopics() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("WRITE_FIRST_METADATA_RESPONSE");
        this.k3po.awaitBarrier("FIRST_FETCH_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("WRITE_SECOND_DESCRIBE_CONFIGS_RESPONSE");
        this.k3po.awaitBarrier("CLIENT_TWO_SUBSCRIBED");
        awaitWindowFromClient();
        this.k3po.notifyBarrier("WRITE_FIRST_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/record.batch.ends.with.deleted.record/client", "${server}/record.batch.ends.with.deleted.record/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldUseLastOffsetFromRecordBatch() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/record.batch.truncated/client", "${server}/record.batch.ends.with.truncated.record/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageWithTruncatedRecord() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/record.batch.truncated/client", "${server}/record.batch.truncated/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageRecordSetEndsWithTruncatedRecordBatch() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/record.batch.truncated/client", "${server}/record.batch.truncated.at.record.boundary/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageRecordBatchTruncatedOnRecordBoundary() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${server}/zero.offset/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldRequestMessagesAtZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${server}/zero.offset.no.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldNotSkipZeroLenthRecordBatchIfAtHighWatermark() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.message/client", "${server}/zero.offset.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageAtZeroOffset() throws Exception {
        this.k3po.start();
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.and.reset/client", "${server}/zero.offset.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldHandleFetchResponseAfterUnsubscribe() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("FETCH_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("DO_CLIENT_RESET");
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.and.reset/client", "${server}/zero.offset.messages.multiple.partitions/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldHandleFetchResponseMultiplePartitionsAfterUnsubscribe() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("FETCH_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("DO_CLIENT_RESET");
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.message/client", "${server}/zero.offset.message.single.partition.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldNotFetchFromNodeWithNoPartitions() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages/client", "${server}/zero.offset.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessagesAtZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messagesets/client", "${server}/zero.offset.messagesets/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessageSetsAtZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.fanout/client", "${server}/zero.offset.messages.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldFanoutMessagesAtZeroOffset() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.group.budget/client", "${server}/zero.offset.messages.group.budget/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\"", "applicationConnectWindow 24"})
    public void shouldFanoutMessagesAtZeroOffsetUsingGroupBudget() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        awaitWindowFromClient();
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        awaitWindowFromClient();
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.group.budget.reset/client", "${server}/zero.offset.messages.group.budget/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\"", "applicationConnectWindow 24"})
    public void shouldFanoutMessagesAtZeroOffsetUsingGroupBudgetReset() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        awaitWindowFromClient();
        awaitWindowFromClient();
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        awaitWindowFromClient();
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.multiple.partitions/client", "${server}/zero.offset.messages.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessagesFromMultipleNodes() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.multiple.partitions/client", "${server}/zero.offset.messages.multiple.partitions/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReceiveMessagesFromMultiplePartitions() throws Exception {
        this.k3po.start();
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.multiple.partitions.partition.1/client", "${server}/zero.offset.messages.multiple.partitions.partition.1/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldCleanUpStateWhenUnsubscribeAfterReceiveMessageFromSecondPartition() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messagesets.fanout/client", "${server}/zero.offset.messagesets.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldFanoutMessageSetsAtZeroOffset() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.finish();
    }

    private void awaitWindowFromClient() {
        try {
            Thread.sleep(200L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
