package org.kinotic.continuum.internal.core.api.service.invoker;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.commons.lang3.Validate;
import org.kinotic.continuum.api.exceptions.RpcMissingMethodException;
import org.kinotic.continuum.core.api.event.CRI;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.core.api.event.EventBusService;
import org.kinotic.continuum.core.api.event.ListenerStatus;
import org.kinotic.continuum.core.api.event.Metadata;
import org.kinotic.continuum.core.api.service.ServiceDescriptor;
import org.kinotic.continuum.core.api.service.ServiceFunction;
import org.kinotic.continuum.core.api.service.ServiceFunctionInstanceProvider;
import org.kinotic.continuum.internal.utils.EventUtil;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:org/kinotic/continuum/internal/core/api/service/invoker/ServiceInvocationSupervisor.class */
public class ServiceInvocationSupervisor {
    private static final Logger log = LoggerFactory.getLogger(ServiceInvocationSupervisor.class);
    private final AtomicBoolean active = new AtomicBoolean(false);
    private final ConcurrentHashMap<String, StreamSubscriber> activeStreamingResults = new ConcurrentHashMap<>();
    private final ArgumentResolver argumentResolver;
    private final EventBusService eventBusService;
    private final ExceptionConverter exceptionConverter;
    private final Map<String, HandlerMethod> methodMap;
    private final ReactiveAdapterRegistry reactiveAdapterRegistry;
    private final ReturnValueConverter returnValueConverter;
    private final ServiceDescriptor serviceDescriptor;
    private final Vertx vertx;
    private Disposable methodInvocationEventListenerDisposable;

    /* loaded from: input_file:org/kinotic/continuum/internal/core/api/service/invoker/ServiceInvocationSupervisor$ReplyListenerStatusSubscriber.class */
    private static class ReplyListenerStatusSubscriber extends BaseSubscriber<ListenerStatus> {
        private final StreamSubscriber streamSubscription;

        public ReplyListenerStatusSubscriber(StreamSubscriber streamSubscriber) {
            this.streamSubscription = streamSubscriber;
        }

        protected void hookOnComplete() {
            ServiceInvocationSupervisor.log.error("Reply Listener Monitor completed for some reason! Terminating streaming result.");
            this.streamSubscription.cancel();
        }

        protected void hookOnError(Throwable th) {
            ServiceInvocationSupervisor.log.error("Reply Listener Monitor threw an exception. Terminating streaming result.", th);
            this.streamSubscription.cancel();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(ListenerStatus listenerStatus) {
            if (ServiceInvocationSupervisor.log.isTraceEnabled()) {
                ServiceInvocationSupervisor.log.trace("Received ListenerStatus " + listenerStatus);
            }
            if (listenerStatus != ListenerStatus.INACTIVE || this.streamSubscription.isDisposed()) {
                return;
            }
            ServiceInvocationSupervisor.log.trace("No more listeners active terminating streaming result.");
            this.streamSubscription.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kinotic/continuum/internal/core/api/service/invoker/ServiceInvocationSupervisor$StreamSubscriber.class */
    public class StreamSubscriber extends BaseSubscriber<Object> {
        private final HandlerMethod handlerMethod;
        private final Metadata incomingMetadata;
        private final Flux<ListenerStatus> replyListenerStatus;
        private ReplyListenerStatusSubscriber replyListenerStatusSubscriber;

        public StreamSubscriber(Metadata metadata, HandlerMethod handlerMethod, Flux<ListenerStatus> flux) {
            this.incomingMetadata = metadata;
            this.handlerMethod = handlerMethod;
            this.replyListenerStatus = flux;
        }

        public void processControlEvent(Event<byte[]> event) {
            String str = event.metadata().get("control");
            if (ServiceInvocationSupervisor.log.isTraceEnabled()) {
                ServiceInvocationSupervisor.log.trace("Processing control event " + str);
            }
            boolean z = -1;
            switch (str.hashCode()) {
                case -1852006340:
                    if (str.equals("suspend")) {
                        z = true;
                        break;
                    }
                    break;
                case -1367724422:
                    if (str.equals("cancel")) {
                        z = false;
                        break;
                    }
                    break;
                case -934426579:
                    if (str.equals("resume")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    cancel();
                    return;
                case true:
                    request(0L);
                    return;
                case true:
                    requestUnbounded();
                    return;
                default:
                    throw new IllegalArgumentException("Unknown control header value " + str);
            }
        }

        protected void hookFinally(SignalType signalType) {
            ServiceInvocationSupervisor.log.trace("Stream Cleanup Now");
            this.replyListenerStatusSubscriber.cancel();
            String str = this.incomingMetadata.get("__correlation-id");
            ServiceInvocationSupervisor.this.vertx.executeBlocking(promise -> {
                ServiceInvocationSupervisor.this.activeStreamingResults.remove(str);
                promise.complete();
            }, (Handler) null);
        }

        protected void hookOnComplete() {
            ServiceInvocationSupervisor.log.trace("Stream Complete");
            ServiceInvocationSupervisor.this.sendCompletionEvent(this.incomingMetadata);
        }

        protected void hookOnError(Throwable th) {
            if (ServiceInvocationSupervisor.log.isTraceEnabled()) {
                ServiceInvocationSupervisor.log.trace("Stream Error", th);
            }
            ServiceInvocationSupervisor.this.handleException(this.incomingMetadata, th);
        }

        protected void hookOnNext(Object obj) {
            if (ServiceInvocationSupervisor.log.isTraceEnabled()) {
                ServiceInvocationSupervisor.log.trace("Next stream value " + obj);
            }
            ServiceInvocationSupervisor.this.convertAndSend(this.incomingMetadata, this.handlerMethod, obj);
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.replyListenerStatusSubscriber = new ReplyListenerStatusSubscriber(this);
            this.replyListenerStatus.subscribe(this.replyListenerStatusSubscriber);
            super.hookOnSubscribe(subscription);
        }
    }

    public ServiceInvocationSupervisor(ServiceDescriptor serviceDescriptor, ServiceFunctionInstanceProvider serviceFunctionInstanceProvider, ArgumentResolver argumentResolver, ReturnValueConverter returnValueConverter, ExceptionConverter exceptionConverter, EventBusService eventBusService, ReactiveAdapterRegistry reactiveAdapterRegistry, Vertx vertx) {
        Validate.notNull(serviceDescriptor, "ServiceDescriptor must not be null", new Object[0]);
        Validate.notNull(serviceFunctionInstanceProvider, "ServiceFunctionInstanceProvider must not be null", new Object[0]);
        Validate.notNull(argumentResolver, "argumentResolver must not be null", new Object[0]);
        Validate.notNull(returnValueConverter, "returnValueConverter must not be null", new Object[0]);
        Validate.notNull(exceptionConverter, "exceptionConverter must not be null", new Object[0]);
        Validate.notNull(eventBusService, "eventBusService must not be null", new Object[0]);
        Validate.notNull(reactiveAdapterRegistry, "reactiveAdapterRegistry must not be null", new Object[0]);
        Validate.notNull(vertx, "vertx must not be null", new Object[0]);
        this.serviceDescriptor = serviceDescriptor;
        this.argumentResolver = argumentResolver;
        this.returnValueConverter = returnValueConverter;
        this.exceptionConverter = exceptionConverter;
        this.eventBusService = eventBusService;
        this.reactiveAdapterRegistry = reactiveAdapterRegistry;
        this.vertx = vertx;
        this.methodMap = buildMethodMap(serviceDescriptor, serviceFunctionInstanceProvider);
    }

    public boolean isActive() {
        return this.active.get();
    }

    public Mono<Void> start() {
        return Mono.create(monoSink -> {
            if (!this.active.compareAndSet(false, true)) {
                monoSink.error(new IllegalStateException("Service already started"));
                return;
            }
            Mono listenWithAck = this.eventBusService.listenWithAck(this.serviceDescriptor.serviceIdentifier().cri().baseResource());
            Consumer consumer = flux -> {
                this.methodInvocationEventListenerDisposable = flux.subscribe(this::processEvent, th -> {
                    log.error("Event listener error", th);
                }, () -> {
                    log.error("Should not happen! Event listener stopped for some reason!! Changing supervisor state to inactive");
                    this.active.set(false);
                });
                monoSink.success();
            };
            Objects.requireNonNull(monoSink);
            listenWithAck.subscribe(consumer, monoSink::error);
        });
    }

    public Mono<Void> stop() {
        return Mono.create(monoSink -> {
            if (!this.active.compareAndSet(true, false)) {
                monoSink.error(new IllegalStateException("Service already stopped"));
                return;
            }
            if (this.methodInvocationEventListenerDisposable != null) {
                this.methodInvocationEventListenerDisposable.dispose();
            }
            Iterator<Map.Entry<String, StreamSubscriber>> it = this.activeStreamingResults.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().cancel();
            }
            monoSink.success();
        });
    }

    private Map<String, HandlerMethod> buildMethodMap(ServiceDescriptor serviceDescriptor, ServiceFunctionInstanceProvider serviceFunctionInstanceProvider) {
        HashMap hashMap = new HashMap();
        for (ServiceFunction serviceFunction : serviceDescriptor.functions()) {
            Object provideInstance = serviceFunctionInstanceProvider.provideInstance(serviceFunction);
            Method selectInvocableMethod = AopUtils.selectInvocableMethod(serviceFunction.invocationMethod(), provideInstance.getClass());
            String str = "/" + selectInvocableMethod.getName();
            if (hashMap.containsKey(str)) {
                throw new IllegalArgumentException("Multiple ServiceFunctions provided with the name " + selectInvocableMethod.getName());
            }
            hashMap.put(str, new HandlerMethod(provideInstance, selectInvocableMethod));
        }
        return hashMap;
    }

    private void convertAndSend(Metadata metadata, HandlerMethod handlerMethod, Object obj) {
        try {
            this.eventBusService.send(this.returnValueConverter.convert(metadata, handlerMethod.getReturnType().getParameterType(), obj));
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("Exception occurred sending response", e);
            }
            throw e;
        }
    }

    private void handleException(Metadata metadata, Throwable th) {
        try {
            this.eventBusService.send(this.exceptionConverter.convert(metadata, th));
        } catch (Exception e) {
            log.error("Error occurred when calling exception converter", th);
        }
    }

    private void processControlPlaneRequest(Event<byte[]> event) {
        String str = event.metadata().get("__correlation-id");
        Validate.notNull(str, "Streaming control plain messages require a CORRELATION_ID_HEADER to be set", new Object[0]);
        this.activeStreamingResults.computeIfPresent(str, (str2, streamSubscriber) -> {
            streamSubscriber.processControlEvent(event);
            return streamSubscriber;
        });
    }

    private void processEvent(Event<byte[]> event) {
        boolean contains = event.metadata().contains("control");
        if (log.isTraceEnabled()) {
            log.trace("Service " + (contains ? "Control" : "Invocation") + " requested for " + event.cri());
        }
        if (!this.exceptionConverter.supports(event.metadata())) {
            log.error("No exception converter found incoming message will be ignored");
            return;
        }
        try {
            Validate.isTrue(event.cri().hasPath(), "The methodId must not be blank", new Object[0]);
            if (contains) {
                processControlPlaneRequest(event);
            } else if (validateReplyTo(event)) {
                processInvocationRequest(event);
            } else {
                log.error("ReplyTo header is missing or invalid incoming message will be ignored\n" + EventUtil.toString(event, true));
            }
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("Exception occurred processing service request\n" + EventUtil.toString(event, true), e);
            }
            handleException(event.metadata(), e);
        }
    }

    private void processInvocationRequest(Event<byte[]> event) {
        if (!this.argumentResolver.supports(event)) {
            throw new IllegalStateException("No compatible ArgumentResolver found");
        }
        HandlerMethod handlerMethod = this.methodMap.get(event.cri().path());
        if (handlerMethod == null) {
            throw new RpcMissingMethodException("No method could be resolved for methodId " + event.cri().path());
        }
        if (!this.returnValueConverter.supports(event.metadata(), handlerMethod.getReturnType().getParameterType())) {
            throw new IllegalStateException("No compatible ReturnValueConverter found");
        }
        Object obj = null;
        boolean z = false;
        try {
            obj = handlerMethod.invoke(this.argumentResolver.resolveArguments(event, handlerMethod));
        } catch (Exception e) {
            z = true;
            handleException(event.metadata(), e);
        }
        if (z) {
            return;
        }
        processMethodInvocationResult(event, handlerMethod, obj);
    }

    private void processMethodInvocationResult(Event<byte[]> event, HandlerMethod handlerMethod, Object obj) {
        Metadata metadata = event.metadata();
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter((Class) null, obj);
        if (adapter == null) {
            convertAndSend(metadata, handlerMethod, obj);
            return;
        }
        if (!adapter.isMultiValue()) {
            Mono.from(adapter.toPublisher(obj)).doOnSuccess(obj2 -> {
                convertAndSend(metadata, handlerMethod, obj2);
            }).subscribe(obj3 -> {
            }, th -> {
                if (log.isDebugEnabled()) {
                    log.debug("Exception occurred processing service request\n" + EventUtil.toString(event, true), th);
                }
                handleException(metadata, th);
            });
        } else {
            if (!event.metadata().contains("__correlation-id")) {
                throw new IllegalArgumentException("Streaming results require a CORRELATION_ID_HEADER to be set");
            }
            this.activeStreamingResults.computeIfAbsent(event.metadata().get("__correlation-id"), str -> {
                Flux from = Flux.from(adapter.toPublisher(obj));
                StreamSubscriber streamSubscriber = new StreamSubscriber(metadata, handlerMethod, this.eventBusService.monitorListenerStatus(CRI.create(event.metadata().get("reply-to")).baseResource()));
                from.subscribe(streamSubscriber);
                return streamSubscriber;
            });
        }
    }

    private void sendCompletionEvent(Metadata metadata) {
        this.eventBusService.send(EventUtil.createReplyEvent(metadata, Map.of("control", "complete"), null));
    }

    private boolean validateReplyTo(Event<byte[]> event) {
        boolean z = false;
        String str = event.metadata().get("reply-to");
        if (str != null) {
            if (str.isBlank()) {
                if (log.isDebugEnabled()) {
                    log.debug("Reply-to header must not be blank");
                }
            } else if (str.startsWith("srv:")) {
                z = true;
            } else if (log.isDebugEnabled()) {
                log.debug("Reply-to header must be a valid service destination");
            }
        } else if (log.isDebugEnabled()) {
            log.debug("No reply-to header found in event");
        }
        return z;
    }
}
