package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.test.ReaktorRule;
import org.reaktivity.reaktor.test.annotation.Configuration;
import org.reaktivity.reaktor.test.annotation.Configure;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/CacheMergedIT.class */
public class CacheMergedIT {
    private final K3poRule k3po = new K3poRule().addScriptRoot("app", "org/reaktivity/specification/nukleus/kafka/streams/application/merged");
    private final TestRule timeout = new DisableOnDebug(new Timeout(10, TimeUnit.SECONDS));
    private final ReaktorRule reaktor = new ReaktorRule().directory("target/nukleus-itests").commandBufferCapacity(1024).responseBufferCapacity(1024).counterValuesBufferCapacity(16384).configure(ReaktorConfiguration.REAKTOR_BUFFER_SLOT_CAPACITY, 8192).configure(KafkaConfiguration.KAFKA_CACHE_SERVER_BOOTSTRAP, false).configure(KafkaConfiguration.KAFKA_CACHE_SEGMENT_BYTES, 1048576).configure(KafkaConfiguration.KAFKA_CACHE_SEGMENT_INDEX_BYTES, 262144).configure(ReaktorConfiguration.REAKTOR_DRAIN_ON_CLOSE, false).configurationRoot("org/reaktivity/specification/nukleus/kafka/config").external("app#1").clean();

    @Rule
    public final TestRule chain = RuleChain.outerRule(this.reaktor).around(this.k3po).around(this.timeout);

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.header/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.fetch.filter.header.with.compaction/client", "${app}/unmerged.fetch.filter.none.with.compaction/server"})
    public void shouldFetchMergedMessagesWithHeaderFilterAfterCompaction() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.not.key/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithNotKeyFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.not.header/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithNotHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.key.and.not.header/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithKeyAndNotHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.header.and.header/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithHeaderAndHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.header.or.header/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithHeaderOrHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.key/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithKeyFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.key.and.header/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithKeyAndHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.key.or.header/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldFetchMergedMessagesWithKeyOrHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.fetch.filter.none/client", "${app}/unmerged.fetch.filter.none/server"})
    public void shouldFetchMergedMessagesWithNoFilter() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("RECEIVED_MESSAGE_B2");
        this.k3po.notifyBarrier("SEND_MESSAGE_A3");
        this.k3po.notifyBarrier("SEND_MESSAGE_B3");
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.fetch.message.values/client", "${app}/unmerged.fetch.message.values/server"})
    public void shouldFetchMergedMessageValues() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CHANGING_PARTITION_COUNT");
        Thread.sleep(200L);
        this.k3po.notifyBarrier("CHANGED_PARTITION_COUNT");
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.fetch.partition.leader.changed/client", "${app}/unmerged.fetch.partition.leader.changed/server"})
    public void shouldFetchMergedPartitionLeaderChanged() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CHANGING_PARTITION_LEADER");
        Thread.sleep(200L);
        this.k3po.notifyBarrier("CHANGED_PARTITION_LEADER");
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.fetch.partition.leader.aborted/client", "${app}/unmerged.fetch.partition.leader.aborted/server"})
    public void shouldFetchMergedPartitionLeaderAborted() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.fetch.partition.offsets.earliest/client", "${app}/unmerged.fetch.partition.offsets.earliest/server"})
    public void shouldFetchMergedPartitionOffsetsEarliest() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.fetch.partition.offsets.earliest.overflow/client", "${app}/unmerged.fetch.partition.offsets.earliest/server"})
    public void shouldFetchMergedPartitionOffsetsEarliestOverflow() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.produce.message.values/client", "${app}/unmerged.produce.message.values/server"})
    public void shouldProduceMergedMessageValues() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.produce.message.values.dynamic/client", "${app}/unmerged.produce.message.values.dynamic/server"})
    public void shouldProduceMergedMessageValuesDynamic() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.produce.message.values.dynamic.hashed/client", "${app}/unmerged.produce.message.values.dynamic.hashed/server"})
    public void shouldProduceMergedMessageValuesDynamicHashed() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Configuration("cache.options.merged.json")
    @Specification({"${app}/merged.produce.message.flags.incomplete/client", "${app}/unmerged.produce.message.flags.incomplete/server"})
    public void shouldProduceMergedMessageFlagsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Configure(name = "nukleus.kafka.cache.server.reconnect", value = "0")
    @Test
    @Specification({"${app}/merged.fetch.server.sent.close/client", "${app}/unmerged.fetch.server.sent.close/server"})
    public void shouldCloseMergedOnUnmergedFetchClose() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Configure(name = "nukleus.kafka.cache.server.reconnect", value = "0")
    @Test
    @Specification({"${app}/merged.fetch.server.sent.close.with.message/client", "${app}/unmerged.fetch.server.sent.close.with.message/server"})
    public void shouldCloseMergedOnUnmergedFetchCloseWithMessage() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("RECEIVED_MESSAGE");
        this.k3po.notifyBarrier("CLOSE_UNMERGED_FETCH_REPLY");
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Configure(name = "nukleus.kafka.cache.server.reconnect", value = "0")
    @Test
    @Specification({"${app}/merged.fetch.server.sent.abort/client", "${app}/unmerged.fetch.server.sent.abort/server"})
    public void shouldCloseMergedOnUnmergedFetchAbort() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Configure(name = "nukleus.kafka.cache.server.reconnect", value = "0")
    @Test
    @Specification({"${app}/merged.fetch.server.sent.abort.with.message/client", "${app}/unmerged.fetch.server.sent.abort.with.message/server"})
    public void shouldCloseMergedOnUnmergedFetchAbortWithMessage() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("RECEIVED_MESSAGE");
        this.k3po.notifyBarrier("ABORT_UNMERGED_FETCH_REPLY");
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Configure(name = "nukleus.kafka.cache.server.reconnect", value = "0")
    @Test
    @Specification({"${app}/merged.fetch.server.sent.abort/client", "${app}/unmerged.fetch.server.sent.reset/server"})
    public void shouldCloseMergedOnUnmergedFetchReset() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Configure(name = "nukleus.kafka.cache.server.reconnect", value = "0")
    @Test
    @Specification({"${app}/merged.fetch.server.sent.abort.with.message/client", "${app}/unmerged.fetch.server.sent.reset.and.abort.with.message/server"})
    public void shouldCloseMergedOnUnmergedFetchResetWithMessage() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("RECEIVED_MESSAGE");
        this.k3po.notifyBarrier("RESET_UNMERGED_FETCH_INITIAL");
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.headers.one/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMessagesWithHeadersOneFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.headers.one.empty/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMessagesWithHeadersOneEmptyFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.headers.many/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMessagesWithHeadersManyFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.headers.many.empty/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMessagesWithHeadersManyEmptyFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.headers.skip.one/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMessagesWithHeadersSkipOneFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.headers.skip.two/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMessagesWithHeadersSkipTwoFilter() throws Exception {
        this.k3po.finish();
    }

    @Configuration("cache.options.merged.json")
    @Test
    @Specification({"${app}/merged.fetch.filter.headers.skip.many/client", "${app}/unmerged.fetch.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMessagesWithHeadersSkipManyFilter() throws Exception {
        this.k3po.finish();
    }
}
