package org.kinotic.continuum.internal.core.api.event;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.clustered.ClusterNodeInfo;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import org.apache.commons.lang3.Validate;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.core.api.event.EventBusService;
import org.kinotic.continuum.core.api.event.ListenerStatus;
import org.kinotic.continuum.internal.core.api.aignite.SubscriptionInfoCacheEntryEventFilter;
import org.kinotic.continuum.internal.core.api.aignite.SubscriptionInfoCacheEntryListener;
import org.kinotic.continuum.internal.util.IgniteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Component
/* loaded from: input_file:org/kinotic/continuum/internal/core/api/event/DefaultEventBusService.class */
public class DefaultEventBusService implements EventBusService {
    private static final Logger log = LoggerFactory.getLogger(DefaultEventBusService.class);

    @Autowired
    private Vertx vertx;

    @Autowired(required = false)
    private Ignite ignite;
    private Scheduler scheduler;
    private IgniteCache<String, Set<ClusterNodeInfo>> subscriptionsCache;

    @PostConstruct
    public void init() {
        this.scheduler = Schedulers.fromExecutor(runnable -> {
            this.vertx.executeBlocking(promise -> {
                runnable.run();
            }, (Handler) null);
        });
        if (this.ignite != null) {
            this.subscriptionsCache = this.ignite.cache("__vertx.subs");
        }
    }

    public void send(Event<byte[]> event) {
        this.vertx.eventBus().send(event.cri().baseResource(), event.data(), createDeliveryOptions(event));
    }

    public Mono<Void> sendWithAck(Event<byte[]> event) {
        Validate.notNull(event, "Event must not be null", new Object[0]);
        return Mono.create(monoSink -> {
            this.vertx.eventBus().request(event.cri().baseResource(), event.data(), createDeliveryOptions(event), asyncResult -> {
                if (asyncResult.succeeded()) {
                    monoSink.success();
                } else {
                    monoSink.error(asyncResult.cause());
                }
            });
        });
    }

    private DeliveryOptions createDeliveryOptions(Event<?> event) {
        DeliveryOptions deliveryOptions = new DeliveryOptions();
        if (event.metadata() instanceof MultiMapMetadataAdapter) {
            deliveryOptions.setHeaders(((MultiMapMetadataAdapter) event.metadata()).getMultiMap());
        } else {
            for (Map.Entry entry : event.metadata()) {
                deliveryOptions.addHeader((String) entry.getKey(), (String) entry.getValue());
            }
        }
        deliveryOptions.addHeader("cri", event.cri().raw());
        return deliveryOptions;
    }

    public Flux<Event<byte[]>> listen(String str) {
        Validate.notEmpty(str, "The cri must be provided", new Object[0]);
        return _listen(str, null);
    }

    public Mono<Flux<Event<byte[]>>> listenWithAck(String str) {
        Validate.notEmpty(str, "The cri must be provided", new Object[0]);
        return Mono.create(monoSink -> {
            MessageConsumer<byte[]> consumer = this.vertx.eventBus().consumer(str);
            ConnectableFlux publish = _listen(null, consumer).publish();
            consumer.completionHandler(asyncResult -> {
                monoSink.success(publish);
            });
            publish.connect();
        });
    }

    public Mono<Boolean> isAnybodyListening(String str) {
        if (this.ignite == null) {
            throw new IllegalStateException("This method is not available when ignite is disabled");
        }
        return IgniteUtils.futureToMono(() -> {
            return this.subscriptionsCache.containsKeyAsync(str);
        });
    }

    public Flux<ListenerStatus> monitorListenerStatus(String str) {
        if (this.ignite == null) {
            throw new IllegalStateException("This method is not available when ignite is disabled");
        }
        return Flux.create(fluxSink -> {
            Context orCreateContext = this.vertx.getOrCreateContext();
            IgniteCache cache = this.ignite.cache("__vertx.subs");
            MutableCacheEntryListenerConfiguration mutableCacheEntryListenerConfiguration = new MutableCacheEntryListenerConfiguration(FactoryBuilder.factoryOf(new SubscriptionInfoCacheEntryListener(fluxSink, orCreateContext)), FactoryBuilder.factoryOf(new SubscriptionInfoCacheEntryEventFilter(str)), false, false);
            fluxSink.onDispose(() -> {
                if (log.isTraceEnabled()) {
                    log.trace("Disposing of monitorListenerStatus for cri: " + str);
                }
                orCreateContext.executeBlocking(promise -> {
                    cache.deregisterCacheEntryListener(mutableCacheEntryListenerConfiguration);
                }, (Handler) null);
            });
            cache.registerCacheEntryListener(mutableCacheEntryListenerConfiguration);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            cache.getAsync(str).listen(igniteFuture -> {
                if (fluxSink.isCancelled() && atomicInteger.incrementAndGet() > 4) {
                    log.error("Sink is canceled but cache listener still sending data for cri: " + str);
                }
                if (igniteFuture.get() == null || ((Set) igniteFuture.get()).size() <= 0) {
                    orCreateContext.executeBlocking(promise -> {
                        fluxSink.next(ListenerStatus.INACTIVE);
                    }, (Handler) null);
                } else {
                    orCreateContext.executeBlocking(promise2 -> {
                        fluxSink.next(ListenerStatus.ACTIVE);
                    }, (Handler) null);
                }
            });
        }).subscribeOn(this.scheduler);
    }

    private Flux<Event<byte[]>> _listen(String str, MessageConsumer<byte[]> messageConsumer) {
        MessageConsumer<byte[]> consumer = messageConsumer != null ? messageConsumer : this.vertx.eventBus().consumer(str);
        return Flux.create(fluxSink -> {
            Objects.requireNonNull(consumer);
            fluxSink.onDispose(consumer::unregister);
            Objects.requireNonNull(fluxSink);
            consumer.exceptionHandler(fluxSink::error);
            consumer.endHandler(r3 -> {
                fluxSink.complete();
            });
            consumer.handler(message -> {
                if (message.replyAddress() != null) {
                    message.reply((Object) null);
                }
                if (fluxSink.isCancelled()) {
                    return;
                }
                this.vertx.executeBlocking(promise -> {
                    fluxSink.next(new MessageEventAdapter(message));
                }, (Handler) null);
            });
        }).subscribeOn(this.scheduler);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -454588528:
                if (implMethodName.equals("lambda$monitorListenerStatus$9af7e592$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteInClosure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/kinotic/continuum/internal/core/api/event/DefaultEventBusService") && serializedLambda.getImplMethodSignature().equals("(Lreactor/core/publisher/FluxSink;Ljava/util/concurrent/atomic/AtomicInteger;Ljava/lang/String;Lio/vertx/core/Context;Lorg/apache/ignite/lang/IgniteFuture;)V")) {
                    FluxSink fluxSink = (FluxSink) serializedLambda.getCapturedArg(0);
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(1);
                    String str = (String) serializedLambda.getCapturedArg(2);
                    Context context = (Context) serializedLambda.getCapturedArg(3);
                    return igniteFuture -> {
                        if (fluxSink.isCancelled() && atomicInteger.incrementAndGet() > 4) {
                            log.error("Sink is canceled but cache listener still sending data for cri: " + str);
                        }
                        if (igniteFuture.get() == null || ((Set) igniteFuture.get()).size() <= 0) {
                            context.executeBlocking(promise -> {
                                fluxSink.next(ListenerStatus.INACTIVE);
                            }, (Handler) null);
                        } else {
                            context.executeBlocking(promise2 -> {
                                fluxSink.next(ListenerStatus.ACTIVE);
                            }, (Handler) null);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
