/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.wow.tck.messaging;

import java.time.Duration;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.SetsKt;
import kotlin.jdk7.AutoCloseableKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import me.ahoo.wow.api.messaging.Message;
import me.ahoo.wow.api.messaging.TopicKindCapable;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.id.GlobalIdGenerator;
import me.ahoo.wow.infra.Decorator;
import me.ahoo.wow.messaging.MessageBus;
import me.ahoo.wow.messaging.ReceiverGroupKt;
import me.ahoo.wow.messaging.handler.MessageExchange;
import me.ahoo.wow.metrics.Metrics;
import me.ahoo.wow.tck.messaging.MessageBusSpec;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kotlin.test.StepVerifierExtensionsKt;

@Metadata(mv={1, 8, 0}, k=1, xi=48, d1={"\u0000Z\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\b&\u0018\u0000 %*\u0010\b\u0000\u0010\u0001*\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0002*\u0012\b\u0001\u0010\u0003*\f\u0012\u0002\b\u0003\u0012\u0004\u0012\u0002H\u00010\u0004*\u0014\b\u0002\u0010\u0005*\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00030\u00062\u00020\u0007:\u0001%B\u0005\u00a2\u0006\u0002\u0010\bJ\r\u0010\r\u001a\u00028\u0000H$\u00a2\u0006\u0002\u0010\u000eJ\r\u0010\u000f\u001a\u00028\u0002H$\u00a2\u0006\u0002\u0010\u0010J\b\u0010\u0011\u001a\u00020\u0012H\u0007J\b\u0010\u0013\u001a\u00020\u0012H\u0007J\b\u0010\u0014\u001a\u00020\u0012H\u0007J%\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u00170\u00162\u0006\u0010\u0018\u001a\u00028\u00022\b\b\u0002\u0010\u0019\u001a\u00020\u001aH\u0002\u00a2\u0006\u0002\u0010\u001bJ\b\u0010\u001c\u001a\u00020\u0012H\u0007J!\u0010\u001d\u001a\u00020\u00122\u0017\u0010\u001e\u001a\u0013\u0012\u0004\u0012\u00028\u0002\u0012\u0004\u0012\u00020\u00120\u001f\u00a2\u0006\u0002\b H\u0016J&\u0010!\u001a\b\u0012\u0004\u0012\u00028\u00010\"*\b\u0012\u0004\u0012\u00028\u00010\"2\f\u0010#\u001a\b\u0012\u0004\u0012\u00020\u00170$H\u0014R\u0012\u0010\t\u001a\u00020\nX\u00a6\u0004\u00a2\u0006\u0006\u001a\u0004\b\u000b\u0010\f\u00a8\u0006&"}, d2={"Lme/ahoo/wow/tck/messaging/MessageBusSpec;", "M", "Lme/ahoo/wow/api/messaging/Message;", "E", "Lme/ahoo/wow/messaging/handler/MessageExchange;", "BUS", "Lme/ahoo/wow/messaging/MessageBus;", "Lme/ahoo/wow/api/messaging/TopicKindCapable;", "()V", "namedAggregate", "Lme/ahoo/wow/api/modeling/NamedAggregate;", "getNamedAggregate", "()Lme/ahoo/wow/api/modeling/NamedAggregate;", "createMessage", "()Lme/ahoo/wow/api/messaging/Message;", "createMessageBus", "()Lme/ahoo/wow/messaging/MessageBus;", "receive", "", "receivePerformance", "send", "sendLoop", "Lreactor/core/publisher/Mono;", "Ljava/lang/Void;", "messageBus", "maxCount", "", "(Lme/ahoo/wow/messaging/MessageBus;I)Lreactor/core/publisher/Mono;", "sendPerformance", "verify", "block", "Lkotlin/Function1;", "Lkotlin/ExtensionFunctionType;", "onReceive", "Lreactor/core/publisher/Flux;", "onReady", "Lreactor/core/publisher/Sinks$Empty;", "Companion", "wow-tck"})
public abstract class MessageBusSpec<M extends Message<?, ?>, E extends MessageExchange<?, ? extends M>, BUS extends MessageBus<M, E>>
implements TopicKindCapable {
    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger log = LoggerFactory.getLogger(MessageBusSpec.class);

    @NotNull
    public abstract NamedAggregate getNamedAggregate();

    @NotNull
    protected abstract BUS createMessageBus();

    @NotNull
    protected abstract M createMessage();

    @NotNull
    protected Flux<E> onReceive(@NotNull Flux<E> $this$onReceive, @NotNull Sinks.Empty<Void> onReady) {
        Intrinsics.checkNotNullParameter($this$onReceive, (String)"<this>");
        Intrinsics.checkNotNullParameter(onReady, (String)"onReady");
        onReady.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
        return $this$onReceive;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void verify(@NotNull Function1<? super BUS, Unit> block) {
        Intrinsics.checkNotNullParameter(block, (String)"block");
        AutoCloseable autoCloseable = (AutoCloseable)Metrics.INSTANCE.metrizable(this.createMessageBus());
        Throwable throwable = null;
        try {
            MessageBus bus = (MessageBus)autoCloseable;
            boolean bl = false;
            if (Decorator.Companion.getDelegate((Object)bus) instanceof TopicKindCapable) {
                Object object = Decorator.Companion.getDelegate((Object)bus);
                Intrinsics.checkNotNull((Object)object, (String)"null cannot be cast to non-null type me.ahoo.wow.api.messaging.TopicKindCapable");
                MatcherAssert.assertThat((Object)((TopicKindCapable)object).getTopicKind(), (Matcher)Matchers.equalTo((Object)this.getTopicKind()));
            }
            block.invoke((Object)bus);
            Unit unit = Unit.INSTANCE;
        }
        catch (Throwable throwable2) {
            throwable = throwable2;
            throw throwable2;
        }
        finally {
            AutoCloseableKt.closeFinally((AutoCloseable)autoCloseable, (Throwable)throwable);
        }
    }

    @Test
    public final void send() {
        this.verify((Function1)new Function1<BUS, Unit>(this){
            final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
            {
                this.this$0 = $receiver;
                super(1);
            }

            public final void invoke(@NotNull BUS $this$verify) {
                Intrinsics.checkNotNullParameter($this$verify, (String)"$this$verify");
                Sinks.Empty empty = Sinks.empty();
                Intrinsics.checkNotNullExpressionValue((Object)empty, (String)"empty<Void>()");
                Sinks.Empty onReady = empty;
                M message = this.this$0.createMessage();
                Flux flux = $this$verify.receive(SetsKt.setOf((Object)this.this$0.getNamedAggregate()));
                String string = GlobalIdGenerator.INSTANCE.generateAsString();
                Intrinsics.checkNotNullExpressionValue((Object)string, (String)"GlobalIdGenerator.generateAsString()");
                Flux flux2 = this.this$0.onReceive(ReceiverGroupKt.writeReceiverGroup((Flux)flux, (String)string), (Sinks.Empty<Void>)onReady).doOnSubscribe(arg_0 -> send.1.invoke$lambda$0((Function1)new Function1<Subscription, Unit>((Sinks.Empty<Void>)onReady, $this$verify, message){
                    final /* synthetic */ Sinks.Empty<Void> $onReady;
                    final /* synthetic */ BUS $this_verify;
                    final /* synthetic */ M $message;
                    {
                        this.$onReady = $onReady;
                        this.$this_verify = $this_verify;
                        this.$message = $message;
                        super(1);
                    }

                    public final void invoke(Subscription it) {
                        this.$onReady.asMono().then(this.$this_verify.send(this.$message)).delaySubscription(Duration.ofMillis(1000L)).subscribe();
                    }
                }, arg_0));
                Intrinsics.checkNotNullExpressionValue((Object)flux2, (String)"{\n            val onRead\u2026      .verify()\n        }");
                StepVerifierExtensionsKt.test((Flux)flux2).consumeNextWith(arg_0 -> send.1.invoke$lambda$1((Function1)new Function1<E, Unit>(message){
                    final /* synthetic */ M $message;
                    {
                        this.$message = $message;
                        super(1);
                    }

                    public final void invoke(E it) {
                        MatcherAssert.assertThat((Object)it.getMessage().getId(), (Matcher)Matchers.equalTo((Object)this.$message.getId()));
                    }
                }, arg_0)).thenCancel().verify();
            }

            private static final void invoke$lambda$0(Function1 $tmp0, Object p0) {
                Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
                $tmp0.invoke(p0);
            }

            private static final void invoke$lambda$1(Function1 $tmp0, Object p0) {
                Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
                $tmp0.invoke(p0);
            }
        });
    }

    @Test
    public final void receive() {
        this.verify((Function1)new Function1<BUS, Unit>(this){
            final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
            {
                this.this$0 = $receiver;
                super(1);
            }

            public final void invoke(@NotNull BUS $this$verify) {
                Intrinsics.checkNotNullParameter($this$verify, (String)"$this$verify");
                Sinks.Empty empty = Sinks.empty();
                Intrinsics.checkNotNullExpressionValue((Object)empty, (String)"empty<Void>()");
                Sinks.Empty onReady = empty;
                Flux flux = $this$verify.receive(SetsKt.setOf((Object)this.this$0.getNamedAggregate()));
                String string = GlobalIdGenerator.INSTANCE.generateAsString();
                Intrinsics.checkNotNullExpressionValue((Object)string, (String)"GlobalIdGenerator.generateAsString()");
                Flux flux2 = this.this$0.onReceive(ReceiverGroupKt.writeReceiverGroup((Flux)flux, (String)string), (Sinks.Empty<Void>)onReady).doOnSubscribe(arg_0 -> receive.1.invoke$lambda$0((Function1)new Function1<Subscription, Unit>((Sinks.Empty<Void>)onReady, this.this$0, $this$verify){
                    final /* synthetic */ Sinks.Empty<Void> $onReady;
                    final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
                    final /* synthetic */ BUS $this_verify;
                    {
                        this.$onReady = $onReady;
                        this.this$0 = $receiver;
                        this.$this_verify = $this_verify;
                        super(1);
                    }

                    public final void invoke(Subscription it) {
                        Flux flux = Flux.range((int)0, (int)10).flatMap(arg_0 -> receive.1.invoke$lambda$0((Function1)new Function1<Integer, Publisher<? extends Void>>(this.this$0, this.$this_verify){
                            final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
                            final /* synthetic */ BUS $this_verify;
                            {
                                this.this$0 = $receiver;
                                this.$this_verify = $this_verify;
                                super(1);
                            }

                            public final Publisher<? extends Void> invoke(Integer it) {
                                M message = this.this$0.createMessage();
                                return (Publisher)this.$this_verify.send(message);
                            }
                        }, arg_0));
                        Intrinsics.checkNotNullExpressionValue((Object)flux, (String)"@Test\n    fun receive() \u2026.verify()\n        }\n    }");
                        Flux sendFlux2 = flux;
                        this.$onReady.asMono().thenMany((Publisher)sendFlux2).delaySubscription(Duration.ofMillis(1000L)).subscribe();
                    }

                    private static final Publisher invoke$lambda$0(Function1 $tmp0, Object p0) {
                        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
                        return (Publisher)$tmp0.invoke(p0);
                    }
                }, arg_0));
                Intrinsics.checkNotNullExpressionValue((Object)flux2, (String)"@Test\n    fun receive() \u2026.verify()\n        }\n    }");
                StepVerifierExtensionsKt.test((Flux)flux2).expectNextCount(10L).thenCancel().verify();
            }

            private static final void invoke$lambda$0(Function1 $tmp0, Object p0) {
                Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
                $tmp0.invoke(p0);
            }
        });
    }

    @Test
    public final void sendPerformance() {
        this.verify((Function1)new Function1<BUS, Unit>(this){
            final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
            {
                this.this$0 = $receiver;
                super(1);
            }

            public final void invoke(@NotNull BUS $this$verify) {
                Intrinsics.checkNotNullParameter($this$verify, (String)"$this$verify");
                Duration duration2 = StepVerifierExtensionsKt.test((Mono)MessageBusSpec.sendLoop$default(this.this$0, $this$verify, 0, 2, null)).verifyComplete();
                Intrinsics.checkNotNullExpressionValue((Object)duration2, (String)"sendLoop(messageBus = th\u2026        .verifyComplete()");
                Duration duration3 = duration2;
                MessageBusSpec.access$getLog$cp().info("[" + $this$verify.getClass().getSimpleName() + "] sendPerformance - duration:{}", (Object)duration3);
            }
        });
    }

    private final Mono<Void> sendLoop(BUS messageBus, int maxCount) {
        Mono mono = Flux.range((int)0, (int)maxCount).flatMap(arg_0 -> MessageBusSpec.sendLoop$lambda$1((Function1)new Function1<Integer, Publisher<? extends Void>>(this, messageBus){
            final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
            final /* synthetic */ BUS $messageBus;
            {
                this.this$0 = $receiver;
                this.$messageBus = $messageBus;
                super(1);
            }

            public final Publisher<? extends Void> invoke(Integer it) {
                M message = this.this$0.createMessage();
                return (Publisher)this.$messageBus.send(message);
            }
        }, arg_0)).then();
        Intrinsics.checkNotNullExpressionValue((Object)mono, (String)"private fun sendLoop(mes\u2026           }.then()\n    }");
        return mono;
    }

    static /* synthetic */ Mono sendLoop$default(MessageBusSpec messageBusSpec, MessageBus messageBus, int n, int n2, Object object) {
        if (object != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: sendLoop");
        }
        if ((n2 & 2) != 0) {
            n = 1000;
        }
        return messageBusSpec.sendLoop(messageBus, n);
    }

    @DisabledIfEnvironmentVariable(named="CI", matches=".*")
    @Test
    public final void receivePerformance() {
        this.verify((Function1)new Function1<BUS, Unit>(this){
            final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
            {
                this.this$0 = $receiver;
                super(1);
            }

            public final void invoke(@NotNull BUS $this$verify) {
                Intrinsics.checkNotNullParameter($this$verify, (String)"$this$verify");
                long maxCount = 1000L;
                Sinks.Empty empty = Sinks.empty();
                Intrinsics.checkNotNullExpressionValue((Object)empty, (String)"empty<Void>()");
                Sinks.Empty onReady = empty;
                Flux flux = $this$verify.receive(SetsKt.setOf((Object)this.this$0.getNamedAggregate()));
                String string = GlobalIdGenerator.INSTANCE.generateAsString();
                Intrinsics.checkNotNullExpressionValue((Object)string, (String)"GlobalIdGenerator.generateAsString()");
                Flux flux2 = this.this$0.onReceive(ReceiverGroupKt.writeReceiverGroup((Flux)flux, (String)string), (Sinks.Empty<Void>)onReady).doOnSubscribe(arg_0 -> receivePerformance.1.invoke$lambda$0((Function1)new Function1<Subscription, Unit>(this.this$0, $this$verify, maxCount, (Sinks.Empty<Void>)onReady){
                    final /* synthetic */ MessageBusSpec<M, E, BUS> this$0;
                    final /* synthetic */ BUS $this_verify;
                    final /* synthetic */ long $maxCount;
                    final /* synthetic */ Sinks.Empty<Void> $onReady;
                    {
                        this.this$0 = $receiver;
                        this.$this_verify = $this_verify;
                        this.$maxCount = $maxCount;
                        this.$onReady = $onReady;
                        super(1);
                    }

                    public final void invoke(Subscription it) {
                        Mono sendFlux2 = MessageBusSpec.access$sendLoop(this.this$0, this.$this_verify, (int)this.$maxCount);
                        this.$onReady.asMono().thenMany((Publisher)sendFlux2).delaySubscription(Duration.ofMillis(1000L)).subscribe();
                    }
                }, arg_0));
                Intrinsics.checkNotNullExpressionValue((Object)flux2, (String)"@DisabledIfEnvironmentVa\u2026duration)\n        }\n    }");
                Duration duration2 = StepVerifierExtensionsKt.test((Flux)flux2).expectNextCount(maxCount).thenCancel().verify();
                Intrinsics.checkNotNullExpressionValue((Object)duration2, (String)"@DisabledIfEnvironmentVa\u2026duration)\n        }\n    }");
                Duration duration3 = duration2;
                MessageBusSpec.access$getLog$cp().info("[" + $this$verify.getClass().getSimpleName() + "] receivePerformance - duration:{}", (Object)duration3);
            }

            private static final void invoke$lambda$0(Function1 $tmp0, Object p0) {
                Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
                $tmp0.invoke(p0);
            }
        });
    }

    private static final Publisher sendLoop$lambda$1(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        return (Publisher)$tmp0.invoke(p0);
    }

    public static final /* synthetic */ Logger access$getLog$cp() {
        return log;
    }

    public static final /* synthetic */ Mono access$sendLoop(MessageBusSpec $this, MessageBus messageBus, int maxCount) {
        return $this.sendLoop(messageBus, maxCount);
    }

    @Metadata(mv={1, 8, 0}, k=1, xi=48, d1={"\u0000\u0014\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002\u00a2\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0006"}, d2={"Lme/ahoo/wow/tck/messaging/MessageBusSpec$Companion;", "", "()V", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "wow-tck"})
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

