package org.reaktivity.nukleus.kafka.internal.control;

import java.util.LinkedHashMap;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.ScriptProperty;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;
import org.reaktivity.nukleus.kafka.internal.KafkaController;
import org.reaktivity.reaktor.test.ReaktorRule;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/control/ControllerIT.class */
public class ControllerIT {
    private final K3poRule k3po = new K3poRule().addScriptRoot("control", "org/reaktivity/specification/nukleus/kafka/control").addScriptRoot("route", "org/reaktivity/specification/nukleus/kafka/control/route").addScriptRoot("unroute", "org/reaktivity/specification/nukleus/kafka/control/unroute").addScriptRoot("routeEx", "org/reaktivity/specification/nukleus/kafka/control/route.ext").addScriptRoot("unrouteEx", "org/reaktivity/specification/nukleus/kafka/control/unroute.ext").addScriptRoot("freeze", "org/reaktivity/specification/nukleus/control/freeze");
    private final TestRule timeout = new DisableOnDebug(new Timeout(5, TimeUnit.SECONDS));
    private final ReaktorRule reaktor;

    @Rule
    public final TestRule chain;

    public ControllerIT() {
        String str = "kafka";
        this.reaktor = new ReaktorRule().directory("target/nukleus-itests").commandBufferCapacity(1024).responseBufferCapacity(1024).counterValuesBufferCapacity(1024).controller((v1) -> {
            return r2.equals(v1);
        });
        this.chain = RuleChain.outerRule(this.k3po).around(this.timeout).around(this.reaktor);
    }

    @Test
    @Specification({"${route}/client/nukleus"})
    public void shouldRouteClient() throws Exception {
        long nextLong = new Random().nextLong();
        this.k3po.start();
        this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, (String) null).get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/nukleus", "${unroute}/client/nukleus"})
    public void shouldUnrouteClient() throws Exception {
        long nextLong = new Random().nextLong();
        this.k3po.start();
        long longValue = ((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, (String) null).get()).longValue();
        this.k3po.notifyBarrier("ROUTED_CLIENT");
        this.reaktor.controller(KafkaController.class).unrouteClient("source", longValue, "target", nextLong, (String) null).get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeEx}/client/nukleus"})
    public void shouldRouteClientWithExtension() throws Exception {
        long nextLong = new Random().nextLong();
        this.k3po.start();
        this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test").get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.header/client/nukleus"})
    public void shouldRouteClientWithTopicAndHeaderCondition() throws Exception {
        long nextLong = new Random().nextLong();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("header1", "match1");
        this.k3po.start();
        this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test", linkedHashMap).get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.headers/client/nukleus"})
    public void shouldRouteClientWithTopicAndHeaderConditions() throws Exception {
        long nextLong = new Random().nextLong();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("header1", "match1");
        linkedHashMap.put("header2", "match2");
        this.k3po.start();
        this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test", linkedHashMap).get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.multiple.headers/client/nukleus"})
    public void shouldRouteClientWithMultipleRoutesDifferingOnlyInHeaders() throws Exception {
        long nextLong = new Random().nextLong();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap.put("header1", "match1");
        linkedHashMap2.put("header1", "match2");
        this.k3po.start();
        Long l = (Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test", linkedHashMap).get();
        Assert.assertEquals(l, this.reaktor.controller(KafkaController.class).routeClient("source", l.longValue(), "target", nextLong, "test", linkedHashMap2).get());
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.multiple.networks/client/nukleus"})
    public void shouldRouteClientWithMultipleRoutesDifferentNetworks() throws Exception {
        long nextLong = new Random().nextLong();
        long nextLong2 = new Random().nextLong();
        this.k3po.start();
        Assert.assertNotEquals(((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target1", nextLong, "test").get()).longValue(), ((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target2", nextLong2, "test").get()).longValue());
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.multiple.topics/client/nukleus"})
    public void shouldRouteClientWithMultipleRoutesDifferentTopics() throws Exception {
        long nextLong = new Random().nextLong();
        this.k3po.start();
        Long l = (Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test1").get();
        CompletableFuture routeClient = this.reaktor.controller(KafkaController.class).routeClient("source", l.longValue(), "target", nextLong, "test2");
        CompletableFuture routeClient2 = this.reaktor.controller(KafkaController.class).routeClient("source", l.longValue(), "target", nextLong, "test3");
        Assert.assertEquals(l, routeClient.get());
        Assert.assertEquals(l, routeClient2.get());
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeEx}/client/nukleus", "${unrouteEx}/client/nukleus"})
    public void shouldUnrouteClientWithExtension() throws Exception {
        long nextLong = new Random().nextLong();
        this.k3po.start();
        long longValue = ((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test").get()).longValue();
        this.k3po.notifyBarrier("ROUTED_CLIENT");
        this.reaktor.controller(KafkaController.class).unrouteClient("source", longValue, "target", nextLong, "test").get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.header/client/nukleus", "${control}/unroute.ext.header/client/nukleus"})
    public void shouldUnrouteClientWithHeader() throws Exception {
        long nextLong = new Random().nextLong();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("header1", "match1");
        this.k3po.start();
        long longValue = ((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test", linkedHashMap).get()).longValue();
        this.k3po.notifyBarrier("ROUTED_CLIENT");
        this.reaktor.controller(KafkaController.class).unrouteClient("source", longValue, "target", nextLong, "test", linkedHashMap).get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.headers/client/nukleus", "${control}/unroute.ext.headers/client/nukleus"})
    public void shouldUnrouteClientWithHeaders() throws Exception {
        long nextLong = new Random().nextLong();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("header1", "match1");
        linkedHashMap.put("header2", "match2");
        this.k3po.start();
        long longValue = ((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test", linkedHashMap).get()).longValue();
        this.k3po.notifyBarrier("ROUTED_CLIENT");
        this.reaktor.controller(KafkaController.class).unrouteClient("source", longValue, "target", nextLong, "test", linkedHashMap).get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.multiple.headers/client/nukleus", "${control}/unroute.ext.multiple.headers/client/nukleus"})
    public void shouldUnrouteClientWithMultipleRoutesDifferingOnlyInHeaders() throws Exception {
        long nextLong = new Random().nextLong();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap.put("header1", "match1");
        linkedHashMap2.put("header1", "match2");
        this.k3po.start();
        Long l = (Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test", linkedHashMap).get();
        this.reaktor.controller(KafkaController.class).routeClient("source", l.longValue(), "target", nextLong, "test", linkedHashMap2).get();
        this.k3po.notifyBarrier("ROUTED_CLIENT");
        this.reaktor.controller(KafkaController.class).unrouteClient("source", l.longValue(), "target", nextLong, "test", linkedHashMap).get();
        this.reaktor.controller(KafkaController.class).unrouteClient("source", l.longValue(), "target", nextLong, "test", linkedHashMap2).get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.multiple.networks/client/nukleus", "${control}/unroute.ext.multiple.networks/client/nukleus"})
    public void shouldUnrouteClientWithMultipleRoutesDifferentNetworks() throws Exception {
        long nextLong = new Random().nextLong();
        long nextLong2 = new Random().nextLong();
        this.k3po.start();
        long longValue = ((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target1", nextLong, "test").get()).longValue();
        long longValue2 = ((Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target2", nextLong2, "test").get()).longValue();
        this.k3po.notifyBarrier("ROUTED_CLIENT");
        this.reaktor.controller(KafkaController.class).unrouteClient("source", longValue, "target1", nextLong, "test").get();
        this.reaktor.controller(KafkaController.class).unrouteClient("source", longValue2, "target2", nextLong2, "test").get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${control}/route.ext.multiple.topics/client/nukleus", "${control}/unroute.ext.multiple.topics/client/nukleus"})
    public void shouldUnrouteClientWithMultipleRoutesDifferentTopics() throws Exception {
        long nextLong = new Random().nextLong();
        this.k3po.start();
        Long l = (Long) this.reaktor.controller(KafkaController.class).routeClient("source", 0L, "target", nextLong, "test1").get();
        CompletableFuture routeClient = this.reaktor.controller(KafkaController.class).routeClient("source", l.longValue(), "target", nextLong, "test2");
        CompletableFuture routeClient2 = this.reaktor.controller(KafkaController.class).routeClient("source", l.longValue(), "target", nextLong, "test3");
        Assert.assertEquals(l, routeClient.get());
        Assert.assertEquals(l, routeClient2.get());
        this.k3po.notifyBarrier("ROUTED_CLIENT");
        this.reaktor.controller(KafkaController.class).unrouteClient("source", l.longValue(), "target", nextLong, "test1").get();
        this.reaktor.controller(KafkaController.class).unrouteClient("source", l.longValue(), "target", nextLong, "test2").get();
        this.reaktor.controller(KafkaController.class).unrouteClient("source", l.longValue(), "target", nextLong, "test3").get();
        this.k3po.finish();
    }

    @Test
    @Specification({"${freeze}/nukleus"})
    @ScriptProperty({"nameF00N \"kafka\""})
    public void shouldFreeze() throws Exception {
        this.k3po.start();
        this.reaktor.controller(KafkaController.class).freeze().get();
        this.k3po.finish();
    }
}
