package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.ScriptProperty;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.KafkaConfigurationTest;
import org.reaktivity.reaktor.test.ReaktorRule;
import org.reaktivity.reaktor.test.annotation.Configure;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/MetadataIT.class */
public class MetadataIT {
    private final K3poRule k3po = new K3poRule().addScriptRoot("route", "org/reaktivity/specification/nukleus/kafka/control/route.ext").addScriptRoot("routeAnyTopic", "org/reaktivity/specification/nukleus/kafka/control/route").addScriptRoot("metadata", "org/reaktivity/specification/kafka/metadata.v5").addScriptRoot("server", "org/reaktivity/specification/kafka/fetch.v5").addScriptRoot("client", "org/reaktivity/specification/nukleus/kafka/streams/fetch");
    private final TestRule timeout = new DisableOnDebug(new Timeout(15, TimeUnit.SECONDS));
    private final ReaktorRule reaktor;

    @Rule
    public final TestRule chain;

    public MetadataIT() {
        String str = "kafka";
        this.reaktor = new ReaktorRule().nukleus((v1) -> {
            return r2.equals(v1);
        }).directory("target/nukleus-itests").commandBufferCapacity(1024).responseBufferCapacity(1024).counterValuesBufferCapacity(4096).configure(KafkaConfiguration.KAFKA_TOPIC_BOOTSTRAP_ENABLED, false).affinityMask("target#0", Long.MIN_VALUE).clean();
        this.chain = RuleChain.outerRule(this.reaktor).around(this.k3po).around(this.timeout);
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/one.topic.leader.not.available.and.retry/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldRetryWhenLeaderNotAvailable() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/one.topic.unknown.error.abort.receive.abort.and.retry/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortReceiveAbortThenReconnectAndRetryWhenMetadataQueryGivesUnknownError() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/one.topic.unknown.error.abort.receive.end.and.retry/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortReceiveEndThenReconnectAndRetryWhenMetadataQueryGivesUnknownError() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/configs.response.unknown.error.abort.receive.abort.and.retry/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortReceiveAbortThenReconnectAndRetryWhenConfigsResponseGivesUnknownError() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.connection.aborted.and.reconnect/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReconnectAndContinueMetadataQueriesWhenBrokerConnectionIsAborted() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeAnyTopic}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.connection.closed.and.reconnect/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReconnectAndContinueMetadataQueriesWhenBrokerIsEnded() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.connection.reset.and.reconnect/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldReconnectAndContinueMetadataQueriesWhenBrokerConnectionIsReset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/one.topic.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldHandleMetadataResponseOneTopicOnMultipleNodes() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/one.topic.multiple.nodes.and.replicas/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldHandleMetadataResponseOneTopicMultipleNodesAndReplicas() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/one.topic.multiple.partitions/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldHandleMetadataResponseOneTopicMultiplePartitionsSingleNode() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/one.topic.single.partition/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldHandleMetadataResponseOneTopicSinglePartition() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.incomplete.response.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenMetadataResponseIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.incomplete.response.broker.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenMetadataResponseBrokerIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.incomplete.response.after.brokers.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenMetadataResponseAfterBrokersIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.incomplete.response.topic.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenMetadataResponseTopicIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/metadata.incomplete.response.topic.partition.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenMetadataResponseTopicPartitionIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/describe.configs.incomplete.response.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenDescribeConfigsResponseIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/describe.configs.incomplete.response.resource.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenDescribeConfigsResponseResourceIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${metadata}/describe.configs.incomplete.response.resource.config.aborts/server"})
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    public void shouldAbortWhenDescribeConfigsResponseResourceConfigIsIncomplete() throws Exception {
        this.k3po.finish();
    }

    @Configure(name = KafkaConfigurationTest.KAFKA_READ_IDLE_TIMEOUT_NAME, value = "2000000")
    @ScriptProperty({"networkAccept \"nukleus://streams/target#0\""})
    @Test
    @Specification({"${routeAnyTopic}/client/controller", "${client}/zero.offset.multiple.topics/client", "${metadata}/unknown.and.known.topics/server"})
    public void shouldRepeatMetadataRequestsWhileTopicIsUnknownAndClientsAreAttachedWithoutBlockingOtherTopicUsage() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("SECOND_UNKNOWN_TOPIC_METADATA_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("CONNECT_CLIENT_TWO");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("WRITE_SECOND_UNKNOWN_TOPIC_METADATA_RESPONSE");
        this.k3po.awaitBarrier("THIRD_UNKNOWN_TOPIC_METADATA_REQUEST_RECEIVED");
        this.k3po.notifyBarrier("DISCONNECT_CLIENT_ONE");
        this.k3po.notifyBarrier("WRITE_THIRD_UNKNOWN_TOPIC_METADATA_RESPONSE");
        this.k3po.notifyBarrier("CONNECT_CLIENT_THREE");
        this.k3po.finish();
    }
}
