package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.ScriptProperty;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.test.ReaktorRule;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/CacheMergedIT.class */
public class CacheMergedIT {
    private final K3poRule k3po = new K3poRule().addScriptRoot("route", "org/reaktivity/specification/nukleus/kafka/control/route.ext").addScriptRoot("server", "org/reaktivity/specification/nukleus/kafka/streams/merged").addScriptRoot("client", "org/reaktivity/specification/nukleus/kafka/streams/merged");
    private final TestRule timeout = new DisableOnDebug(new Timeout(10, TimeUnit.SECONDS));
    private final ReaktorRule reaktor;

    @Rule
    public final TestRule chain;

    public CacheMergedIT() {
        String str = "kafka";
        this.reaktor = new ReaktorRule().nukleus((v1) -> {
            return r2.equals(v1);
        }).directory("target/nukleus-itests").commandBufferCapacity(1024).responseBufferCapacity(1024).counterValuesBufferCapacity(16384).configure(ReaktorConfiguration.REAKTOR_BUFFER_SLOT_CAPACITY, 8192).configure(ReaktorConfiguration.REAKTOR_DRAIN_ON_CLOSE, false).configure(KafkaConfiguration.KAFKA_CACHE_SERVER_BOOTSTRAP, false).configure(KafkaConfiguration.KAFKA_CACHE_SERVER_RECONNECT_DELAY, 0).configure(KafkaConfiguration.KAFKA_CACHE_SEGMENT_BYTES, 1048576).configure(KafkaConfiguration.KAFKA_CACHE_SEGMENT_INDEX_BYTES, 262144).affinityMask("target#0", Long.MIN_VALUE).clean();
        this.chain = RuleChain.outerRule(this.reaktor).around(this.k3po).around(this.timeout);
    }

    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.filter.header/client", "${server}/unmerged.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMergedMessagesWithHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.filter.header.and.header/client", "${server}/unmerged.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMergedMessagesWithHeaderAndHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.filter.header.or.header/client", "${server}/unmerged.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMergedMessagesWithHeaderOrHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.filter.key/client", "${server}/unmerged.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMergedMessagesWithKeyFilter() throws Exception {
        this.k3po.finish();
    }

    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.filter.key.and.header/client", "${server}/unmerged.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMergedMessagesWithKeyAndHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.filter.key.or.header/client", "${server}/unmerged.filter.none/server"})
    @Ignore("requires k3po parallel reads")
    public void shouldReceiveMergedMessagesWithKeyOrHeaderFilter() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.filter.none/client", "${server}/unmerged.filter.none/server"})
    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    public void shouldReceiveMergedMessagesWithNoFilter() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/cache.merged/controller", "${client}/merged.message.values/client", "${server}/unmerged.message.values/server"})
    @ScriptProperty({"serverAddress \"nukleus://streams/target#0\""})
    public void shouldReceiveMergedMessageValues() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CHANGING_PARTITION_COUNT");
        Thread.sleep(200L);
        this.k3po.notifyBarrier("CHANGED_PARTITION_COUNT");
        this.k3po.finish();
    }
}
