package org.kinotic.continuum.internal.core.api;

import io.vertx.core.Vertx;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.commons.lang3.Validate;
import org.kinotic.continuum.api.Continuum;
import org.kinotic.continuum.api.annotations.Proxy;
import org.kinotic.continuum.core.api.RpcServiceProxyHandle;
import org.kinotic.continuum.core.api.ServiceRegistry;
import org.kinotic.continuum.core.api.event.EventBusService;
import org.kinotic.continuum.core.api.service.ServiceDescriptor;
import org.kinotic.continuum.core.api.service.ServiceFunctionInstanceProvider;
import org.kinotic.continuum.core.api.service.ServiceIdentifier;
import org.kinotic.continuum.internal.ServiceRegistrationBeanPostProcessor;
import org.kinotic.continuum.internal.core.api.service.invoker.ArgumentResolverComposite;
import org.kinotic.continuum.internal.core.api.service.invoker.ExceptionConverterComposite;
import org.kinotic.continuum.internal.core.api.service.invoker.ReturnValueConverterComposite;
import org.kinotic.continuum.internal.core.api.service.invoker.ServiceInvocationSupervisor;
import org.kinotic.continuum.internal.core.api.service.rpc.DefaultRpcServiceProxyHandle;
import org.kinotic.continuum.internal.core.api.service.rpc.RpcArgumentConverterResolver;
import org.kinotic.continuum.internal.core.api.service.rpc.RpcReturnValueHandlerFactory;
import org.kinotic.continuum.internal.utils.ContinuumUtil;
import org.kinotic.continuum.internal.utils.MetaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/kinotic/continuum/internal/core/api/DefaultServiceRegistry.class */
public class DefaultServiceRegistry implements ServiceRegistry {
    private static final Logger log = LoggerFactory.getLogger(ServiceRegistrationBeanPostProcessor.class);

    @Autowired
    private Vertx vertx;

    @Autowired
    private EventBusService eventBusService;

    @Autowired
    private Continuum continuum;

    @Autowired
    private ArgumentResolverComposite argumentResolver;

    @Autowired
    private ReturnValueConverterComposite returnValueConverter;

    @Autowired
    private ExceptionConverterComposite exceptionConverter;

    @Autowired
    private RpcArgumentConverterResolver rpcArgumentConverterResolver;

    @Autowired
    private RpcReturnValueHandlerFactory rpcReturnValueHandlerFactory;

    @Autowired
    private ReactiveAdapterRegistry reactiveAdapterRegistry;
    private ConcurrentHashMap<ServiceIdentifier, ServiceInvocationSupervisor> supervisors = new ConcurrentHashMap<>();

    public Mono<Void> register(ServiceIdentifier serviceIdentifier, Class<?> cls, Object obj) {
        try {
            return register(ServiceDescriptor.create(serviceIdentifier, cls), ServiceFunctionInstanceProvider.create(obj));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    public Mono<Void> register(ServiceDescriptor serviceDescriptor, ServiceFunctionInstanceProvider serviceFunctionInstanceProvider) {
        return Mono.create(monoSink -> {
            this.supervisors.compute(serviceDescriptor.serviceIdentifier(), (serviceIdentifier, serviceInvocationSupervisor) -> {
                if (serviceInvocationSupervisor == null) {
                    try {
                        serviceInvocationSupervisor = new ServiceInvocationSupervisor(serviceDescriptor, serviceFunctionInstanceProvider, this.argumentResolver, this.returnValueConverter, this.exceptionConverter, this.eventBusService, this.reactiveAdapterRegistry, this.vertx);
                        Mono<Void> start = serviceInvocationSupervisor.start();
                        Objects.requireNonNull(monoSink);
                        Consumer consumer = (v1) -> {
                            r1.success(v1);
                        };
                        Objects.requireNonNull(monoSink);
                        start.subscribe(consumer, monoSink::error);
                    } catch (Exception e) {
                        monoSink.error(e);
                    }
                } else {
                    monoSink.error(new IllegalArgumentException("Service already registered for ServiceIdentifier " + serviceDescriptor.serviceIdentifier()));
                }
                return serviceInvocationSupervisor;
            });
        });
    }

    public Mono<Void> unregister(ServiceIdentifier serviceIdentifier) {
        return Mono.create(monoSink -> {
            this.supervisors.compute(serviceIdentifier, (serviceIdentifier2, serviceInvocationSupervisor) -> {
                if (serviceInvocationSupervisor == null) {
                    monoSink.error(new IllegalArgumentException(" No Service registered for for ServiceIdentifier " + serviceIdentifier));
                    return null;
                }
                Mono<Void> stop = serviceInvocationSupervisor.stop();
                Objects.requireNonNull(monoSink);
                Consumer consumer = (v1) -> {
                    r1.success(v1);
                };
                Objects.requireNonNull(monoSink);
                stop.subscribe(consumer, monoSink::error);
                return null;
            });
        });
    }

    public <T> RpcServiceProxyHandle<T> serviceProxy(ServiceIdentifier serviceIdentifier, Class<T> cls) {
        return new DefaultRpcServiceProxyHandle(serviceIdentifier, this.continuum.nodeName(), cls, this.rpcArgumentConverterResolver.resolve("application/json"), this.rpcReturnValueHandlerFactory, this.eventBusService, Thread.currentThread().getContextClassLoader());
    }

    public <T> RpcServiceProxyHandle<T> serviceProxy(ServiceIdentifier serviceIdentifier, Class<T> cls, String str) {
        Validate.notBlank(str, "The contentTypeExpected must not be blank", new Object[0]);
        Validate.isTrue(this.rpcArgumentConverterResolver.canResolve(str), "The contentType:" + str + " does not have any configured RpcArgumentConverter's", new Object[0]);
        return new DefaultRpcServiceProxyHandle(serviceIdentifier, this.continuum.nodeName(), cls, this.rpcArgumentConverterResolver.resolve(str), this.rpcReturnValueHandlerFactory, this.eventBusService, Thread.currentThread().getContextClassLoader());
    }

    public <T> RpcServiceProxyHandle<T> serviceProxy(Class<T> cls) {
        Proxy annotation = cls.getAnnotation(Proxy.class);
        Validate.notNull(annotation, "The Class provided must be annotated with @Proxy", new Object[0]);
        return serviceProxy(new ServiceIdentifier(annotation.namespace().isEmpty() ? ContinuumUtil.safeEncodeURI(cls.getPackageName()) : annotation.namespace(), annotation.name().isEmpty() ? cls.getSimpleName() : annotation.name(), (String) null, MetaUtil.getVersion(cls)), cls, "application/json");
    }
}
