/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.wow.tck.messaging;

import java.time.Duration;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.SetsKt;
import kotlin.jdk7.AutoCloseableKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import me.ahoo.wow.api.Identifier;
import me.ahoo.wow.api.messaging.Message;
import me.ahoo.wow.api.messaging.TopicKindCapable;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.id.GlobalIdGenerator;
import me.ahoo.wow.infra.Decorator;
import me.ahoo.wow.messaging.MessageBus;
import me.ahoo.wow.messaging.ReceiverGroupKt;
import me.ahoo.wow.messaging.handler.MessageExchange;
import me.ahoo.wow.metrics.Metrics;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kotlin.test.StepVerifierExtensionsKt;

@Metadata(mv={2, 0, 0}, k=1, xi=48, d1={"\u0000X\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0004\b&\u0018\u0000 &*\u0010\b\u0000\u0010\u0001*\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0002*\u0012\b\u0001\u0010\u0003*\f\u0012\u0002\b\u0003\u0012\u0004\u0012\u0002H\u00010\u0004*\u0014\b\u0002\u0010\u0005*\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00030\u00062\u00020\u0007:\u0001&B\u0007\u00a2\u0006\u0004\b\b\u0010\tJ\r\u0010\u000e\u001a\u00028\u0002H$\u00a2\u0006\u0002\u0010\u000fJ\r\u0010\u0010\u001a\u00028\u0000H$\u00a2\u0006\u0002\u0010\u0011J&\u0010\u0012\u001a\b\u0012\u0004\u0012\u00028\u00010\u0013*\b\u0012\u0004\u0012\u00028\u00010\u00132\f\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00160\u0015H\u0014J!\u0010\u0017\u001a\u00020\u00182\u0017\u0010\u0019\u001a\u0013\u0012\u0004\u0012\u00028\u0002\u0012\u0004\u0012\u00020\u00180\u001a\u00a2\u0006\u0002\b\u001bH\u0016J\b\u0010\u001c\u001a\u00020\u0018H\u0007J\b\u0010\u001d\u001a\u00020\u0018H\u0007J\b\u0010\u001e\u001a\u00020\u0018H\u0007J%\u0010\u001f\u001a\b\u0012\u0004\u0012\u00020\u00160 2\u0006\u0010!\u001a\u00028\u00022\b\b\u0002\u0010\"\u001a\u00020#H\u0002\u00a2\u0006\u0002\u0010$J\b\u0010%\u001a\u00020\u0018H\u0007R\u0012\u0010\n\u001a\u00020\u000bX\u00a6\u0004\u00a2\u0006\u0006\u001a\u0004\b\f\u0010\r\u00a8\u0006'"}, d2={"Lme/ahoo/wow/tck/messaging/MessageBusSpec;", "M", "Lme/ahoo/wow/api/messaging/Message;", "E", "Lme/ahoo/wow/messaging/handler/MessageExchange;", "BUS", "Lme/ahoo/wow/messaging/MessageBus;", "Lme/ahoo/wow/api/messaging/TopicKindCapable;", "<init>", "()V", "namedAggregate", "Lme/ahoo/wow/api/modeling/NamedAggregate;", "getNamedAggregate", "()Lme/ahoo/wow/api/modeling/NamedAggregate;", "createMessageBus", "()Lme/ahoo/wow/messaging/MessageBus;", "createMessage", "()Lme/ahoo/wow/api/messaging/Message;", "onReceive", "Lreactor/core/publisher/Flux;", "onReady", "Lreactor/core/publisher/Sinks$Empty;", "Ljava/lang/Void;", "verify", "", "block", "Lkotlin/Function1;", "Lkotlin/ExtensionFunctionType;", "send", "receive", "sendPerformance", "sendLoop", "Lreactor/core/publisher/Mono;", "messageBus", "maxCount", "", "(Lme/ahoo/wow/messaging/MessageBus;I)Lreactor/core/publisher/Mono;", "receivePerformance", "Companion", "wow-tck"})
public abstract class MessageBusSpec<M extends Message<?, ?>, E extends MessageExchange<?, ? extends M>, BUS extends MessageBus<M, E>>
implements TopicKindCapable {
    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger log = LoggerFactory.getLogger(MessageBusSpec.class);

    @NotNull
    public abstract NamedAggregate getNamedAggregate();

    @NotNull
    protected abstract BUS createMessageBus();

    @NotNull
    protected abstract M createMessage();

    @NotNull
    protected Flux<E> onReceive(@NotNull Flux<E> $this$onReceive, @NotNull Sinks.Empty<Void> onReady) {
        Intrinsics.checkNotNullParameter($this$onReceive, (String)"<this>");
        Intrinsics.checkNotNullParameter(onReady, (String)"onReady");
        onReady.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
        return $this$onReceive;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void verify(@NotNull Function1<? super BUS, Unit> block) {
        Intrinsics.checkNotNullParameter(block, (String)"block");
        AutoCloseable autoCloseable = (AutoCloseable)Metrics.INSTANCE.metrizable(this.createMessageBus());
        Throwable throwable = null;
        try {
            MessageBus bus = (MessageBus)autoCloseable;
            boolean bl = false;
            if (Decorator.Companion.getOriginalDelegate((Object)bus) instanceof TopicKindCapable) {
                Object object = Decorator.Companion.getOriginalDelegate((Object)bus);
                Intrinsics.checkNotNull((Object)object, (String)"null cannot be cast to non-null type me.ahoo.wow.api.messaging.TopicKindCapable");
                MatcherAssert.assertThat((Object)((TopicKindCapable)object).getTopicKind(), (Matcher)Matchers.equalTo((Object)this.getTopicKind()));
            }
            block.invoke((Object)bus);
            Unit unit = Unit.INSTANCE;
        }
        catch (Throwable throwable2) {
            throwable = throwable2;
            throw throwable2;
        }
        finally {
            AutoCloseableKt.closeFinally((AutoCloseable)autoCloseable, (Throwable)throwable);
        }
    }

    @Test
    public final void send() {
        this.verify(arg_0 -> MessageBusSpec.send$lambda$5(this, arg_0));
    }

    @Test
    public final void receive() {
        this.verify(arg_0 -> MessageBusSpec.receive$lambda$10(this, arg_0));
    }

    @Test
    public final void sendPerformance() {
        this.verify(arg_0 -> MessageBusSpec.sendPerformance$lambda$13(this, arg_0));
    }

    private final Mono<Void> sendLoop(BUS messageBus, int maxCount) {
        Mono mono = Flux.range((int)0, (int)maxCount).flatMap(arg_0 -> MessageBusSpec.sendLoop$lambda$15(arg_0 -> MessageBusSpec.sendLoop$lambda$14(this, messageBus, arg_0), arg_0)).then();
        Intrinsics.checkNotNullExpressionValue((Object)mono, (String)"then(...)");
        return mono;
    }

    static /* synthetic */ Mono sendLoop$default(MessageBusSpec messageBusSpec, MessageBus messageBus, int n, int n2, Object object) {
        if (object != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: sendLoop");
        }
        if ((n2 & 2) != 0) {
            n = 1000;
        }
        return messageBusSpec.sendLoop(messageBus, n);
    }

    @DisabledIfEnvironmentVariable(named="CI", matches=".*")
    @Test
    public final void receivePerformance() {
        this.verify(arg_0 -> MessageBusSpec.receivePerformance$lambda$18(this, arg_0));
    }

    private static final Unit send$lambda$5$lambda$1(Sinks.Empty $onReady, MessageBus $this_verify, Message $message, Subscription it) {
        Intrinsics.checkNotNullParameter((Object)$onReady, (String)"$onReady");
        Intrinsics.checkNotNullParameter((Object)$this_verify, (String)"$this_verify");
        Intrinsics.checkNotNullParameter((Object)$message, (String)"$message");
        $onReady.asMono().then($this_verify.send($message)).delaySubscription(Duration.ofMillis(1000L)).subscribe();
        return Unit.INSTANCE;
    }

    private static final void send$lambda$5$lambda$2(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        $tmp0.invoke(p0);
    }

    private static final Unit send$lambda$5$lambda$3(Message $message, MessageExchange it) {
        Intrinsics.checkNotNullParameter((Object)$message, (String)"$message");
        MatcherAssert.assertThat((Object)((Identifier)it.getMessage()).getId(), (Matcher)Matchers.equalTo((Object)((Identifier)$message).getId()));
        return Unit.INSTANCE;
    }

    private static final void send$lambda$5$lambda$4(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        $tmp0.invoke(p0);
    }

    private static final Unit send$lambda$5(MessageBusSpec this$0, MessageBus $this$verify) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this$verify, (String)"$this$verify");
        Sinks.Empty empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue((Object)empty, (String)"empty(...)");
        Sinks.Empty onReady = empty;
        Object message = this$0.createMessage();
        Flux flux = $this$verify.receive(SetsKt.setOf((Object)this$0.getNamedAggregate()));
        String string = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue((Object)string, (String)"generateAsString(...)");
        Flux flux2 = this$0.onReceive(ReceiverGroupKt.writeReceiverGroup((Flux)flux, (String)string), (Sinks.Empty<Void>)onReady).doOnSubscribe(arg_0 -> MessageBusSpec.send$lambda$5$lambda$2(arg_0 -> MessageBusSpec.send$lambda$5$lambda$1(onReady, $this$verify, message, arg_0), arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux2, (String)"doOnSubscribe(...)");
        StepVerifierExtensionsKt.test((Flux)flux2).consumeNextWith(arg_0 -> MessageBusSpec.send$lambda$5$lambda$4(arg_0 -> MessageBusSpec.send$lambda$5$lambda$3(message, arg_0), arg_0)).thenCancel().verify();
        return Unit.INSTANCE;
    }

    private static final Publisher receive$lambda$10$lambda$8$lambda$6(MessageBusSpec this$0, MessageBus $this_verify, Integer it) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this_verify, (String)"$this_verify");
        Object message = this$0.createMessage();
        return (Publisher)$this_verify.send(message);
    }

    private static final Publisher receive$lambda$10$lambda$8$lambda$7(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Unit receive$lambda$10$lambda$8(Sinks.Empty $onReady, MessageBusSpec this$0, MessageBus $this_verify, Subscription it) {
        Intrinsics.checkNotNullParameter((Object)$onReady, (String)"$onReady");
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this_verify, (String)"$this_verify");
        Flux flux = Flux.range((int)0, (int)10).flatMap(arg_0 -> MessageBusSpec.receive$lambda$10$lambda$8$lambda$7(arg_0 -> MessageBusSpec.receive$lambda$10$lambda$8$lambda$6(this$0, $this_verify, arg_0), arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux, (String)"flatMap(...)");
        Flux sendFlux = flux;
        $onReady.asMono().thenMany((Publisher)sendFlux).delaySubscription(Duration.ofMillis(1000L)).subscribe();
        return Unit.INSTANCE;
    }

    private static final void receive$lambda$10$lambda$9(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        $tmp0.invoke(p0);
    }

    private static final Unit receive$lambda$10(MessageBusSpec this$0, MessageBus $this$verify) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this$verify, (String)"$this$verify");
        Sinks.Empty empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue((Object)empty, (String)"empty(...)");
        Sinks.Empty onReady = empty;
        Flux flux = $this$verify.receive(SetsKt.setOf((Object)this$0.getNamedAggregate()));
        String string = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue((Object)string, (String)"generateAsString(...)");
        Flux flux2 = this$0.onReceive(ReceiverGroupKt.writeReceiverGroup((Flux)flux, (String)string), (Sinks.Empty<Void>)onReady).doOnSubscribe(arg_0 -> MessageBusSpec.receive$lambda$10$lambda$9(arg_0 -> MessageBusSpec.receive$lambda$10$lambda$8(onReady, this$0, $this$verify, arg_0), arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux2, (String)"doOnSubscribe(...)");
        StepVerifierExtensionsKt.test((Flux)flux2).expectNextCount(10L).thenCancel().verify();
        return Unit.INSTANCE;
    }

    private static final Unit sendPerformance$lambda$13$lambda$11(MessageBusSpec this$0, MessageBus $this_verify, Subscription it) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this_verify, (String)"$this_verify");
        Duration duration = StepVerifierExtensionsKt.test((Mono)MessageBusSpec.sendLoop$default(this$0, $this_verify, 0, 2, null)).verifyComplete();
        Intrinsics.checkNotNullExpressionValue((Object)duration, (String)"verifyComplete(...)");
        Duration duration2 = duration;
        log.info("[" + $this_verify.getClass().getSimpleName() + "] sendPerformance - duration:{}", (Object)duration2);
        return Unit.INSTANCE;
    }

    private static final void sendPerformance$lambda$13$lambda$12(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        $tmp0.invoke(p0);
    }

    private static final Unit sendPerformance$lambda$13(MessageBusSpec this$0, MessageBus $this$verify) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this$verify, (String)"$this$verify");
        Sinks.Empty empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue((Object)empty, (String)"empty(...)");
        Sinks.Empty onReady = empty;
        Flux flux = $this$verify.receive(SetsKt.setOf((Object)this$0.getNamedAggregate()));
        String string = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue((Object)string, (String)"generateAsString(...)");
        Flux flux2 = this$0.onReceive(ReceiverGroupKt.writeReceiverGroup((Flux)flux, (String)string), (Sinks.Empty<Void>)onReady).doOnSubscribe(arg_0 -> MessageBusSpec.sendPerformance$lambda$13$lambda$12(arg_0 -> MessageBusSpec.sendPerformance$lambda$13$lambda$11(this$0, $this$verify, arg_0), arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux2, (String)"doOnSubscribe(...)");
        StepVerifierExtensionsKt.test((Flux)flux2).thenCancel().verify();
        return Unit.INSTANCE;
    }

    private static final Publisher sendLoop$lambda$14(MessageBusSpec this$0, MessageBus $messageBus, Integer it) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$messageBus, (String)"$messageBus");
        Object message = this$0.createMessage();
        return (Publisher)$messageBus.send(message);
    }

    private static final Publisher sendLoop$lambda$15(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Unit receivePerformance$lambda$18$lambda$16(MessageBusSpec this$0, MessageBus $this_verify, long $maxCount, Sinks.Empty $onReady, Subscription it) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this_verify, (String)"$this_verify");
        Intrinsics.checkNotNullParameter((Object)$onReady, (String)"$onReady");
        Mono<Void> sendFlux = this$0.sendLoop($this_verify, (int)$maxCount);
        $onReady.asMono().thenMany((Publisher)sendFlux).delaySubscription(Duration.ofMillis(1000L)).subscribe();
        return Unit.INSTANCE;
    }

    private static final void receivePerformance$lambda$18$lambda$17(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        $tmp0.invoke(p0);
    }

    private static final Unit receivePerformance$lambda$18(MessageBusSpec this$0, MessageBus $this$verify) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$this$verify, (String)"$this$verify");
        long maxCount = 1000L;
        Sinks.Empty empty = Sinks.empty();
        Intrinsics.checkNotNullExpressionValue((Object)empty, (String)"empty(...)");
        Sinks.Empty onReady = empty;
        Flux flux = $this$verify.receive(SetsKt.setOf((Object)this$0.getNamedAggregate()));
        String string = GlobalIdGenerator.INSTANCE.generateAsString();
        Intrinsics.checkNotNullExpressionValue((Object)string, (String)"generateAsString(...)");
        Flux flux2 = this$0.onReceive(ReceiverGroupKt.writeReceiverGroup((Flux)flux, (String)string), (Sinks.Empty<Void>)onReady).doOnSubscribe(arg_0 -> MessageBusSpec.receivePerformance$lambda$18$lambda$17(arg_0 -> MessageBusSpec.receivePerformance$lambda$18$lambda$16(this$0, $this$verify, maxCount, onReady, arg_0), arg_0));
        Intrinsics.checkNotNullExpressionValue((Object)flux2, (String)"doOnSubscribe(...)");
        Duration duration = StepVerifierExtensionsKt.test((Flux)flux2).expectNextCount(maxCount).thenCancel().verify();
        Intrinsics.checkNotNullExpressionValue((Object)duration, (String)"verify(...)");
        Duration duration2 = duration;
        log.info("[" + $this$verify.getClass().getSimpleName() + "] receivePerformance - duration:{}", (Object)duration2);
        return Unit.INSTANCE;
    }

    @Metadata(mv={2, 0, 0}, k=1, xi=48, d1={"\u0000\u0014\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\b\u0086\u0003\u0018\u00002\u00020\u0001B\t\b\u0002\u00a2\u0006\u0004\b\u0002\u0010\u0003R\u0018\u0010\u0004\u001a\n \u0006*\u0004\u0018\u00010\u00050\u0005X\u0082\u0004\u00a2\u0006\u0004\n\u0002\u0010\u0007\u00a8\u0006\b"}, d2={"Lme/ahoo/wow/tck/messaging/MessageBusSpec$Companion;", "", "<init>", "()V", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "Lorg/slf4j/Logger;", "wow-tck"})
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

