package io.rsocket.broker.client.spring;

import io.rsocket.RSocket;
import io.rsocket.broker.common.spring.ClientTransportFactory;
import io.rsocket.broker.common.spring.DefaultClientTransportFactory;
import io.rsocket.broker.common.spring.MimeTypes;
import io.rsocket.broker.frames.RouteSetup;
import io.rsocket.transport.ClientTransport;
import java.net.URI;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.rsocket.RSocketConnectorConfigurer;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.CollectionUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Sinks;

@EnableConfigurationProperties
@AutoConfigureBefore({RSocketRequesterAutoConfiguration.class})
@Configuration
@ConditionalOnClass({RSocket.class, RSocketRequester.class})
@AutoConfigureAfter({BrokerClientRSocketStrategiesAutoConfiguration.class})
@ConditionalOnProperty(name = {"io.rsocket.broker.client.enabled"}, matchIfMissing = true)
/* loaded from: input_file:io/rsocket/broker/client/spring/BrokerClientAutoConfiguration.class */
public class BrokerClientAutoConfiguration {

    /* loaded from: input_file:io/rsocket/broker/client/spring/BrokerClientAutoConfiguration$ClientThreadManager.class */
    private static class ClientThreadManager implements Disposable {
        private final Sinks.One<Void> onClose;

        private ClientThreadManager() {
            this.onClose = Sinks.one();
            Thread thread = new Thread("broker-client-thread") { // from class: io.rsocket.broker.client.spring.BrokerClientAutoConfiguration.ClientThreadManager.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    ClientThreadManager.this.onClose.asMono().block();
                }
            };
            thread.setContextClassLoader(getClass().getClassLoader());
            thread.setDaemon(false);
            thread.start();
        }

        public void dispose() {
            this.onClose.emitEmpty((signalType, emitResult) -> {
                return false;
            });
        }

        public boolean isDisposed() {
            return false;
        }
    }

    @Bean
    public BrokerClientProperties brokerClientProperties() {
        return new BrokerClientProperties();
    }

    @ConditionalOnMissingBean
    @Scope("prototype")
    @Bean
    public BrokerRSocketRequesterBuilder brokerRSocketRequesterBuilder(RSocketConnectorConfigurer rSocketConnectorConfigurer, RSocketStrategies rSocketStrategies, BrokerClientProperties brokerClientProperties) {
        RouteSetup.Builder from = RouteSetup.from(brokerClientProperties.getRouteId(), brokerClientProperties.getServiceName());
        brokerClientProperties.getTags().forEach((mutableKey, str) -> {
            if (mutableKey.getWellKnownKey() != null) {
                from.with(mutableKey.getWellKnownKey(), str);
            } else if (mutableKey.getKey() != null) {
                from.with(mutableKey.getKey(), str);
            }
        });
        return new BrokerRSocketRequesterBuilder(RSocketRequester.builder().setupMetadata(from.build(), MimeTypes.BROKER_FRAME_MIME_TYPE).rsocketStrategies(rSocketStrategies).rsocketConnector(rSocketConnectorConfigurer), brokerClientProperties, rSocketStrategies.routeMatcher());
    }

    @ConditionalOnMissingBean
    @Bean
    public RSocketConnectorConfigurer rSocketConnectorConfigurer(RSocketMessageHandler rSocketMessageHandler) {
        return rSocketConnector -> {
            rSocketConnector.acceptor(rSocketMessageHandler.responder());
        };
    }

    @Bean
    public BrokerMetadata brokerMetadata(BrokerClientProperties brokerClientProperties) {
        return new BrokerMetadata(brokerClientProperties);
    }

    @Bean
    public DefaultClientTransportFactory defaultClientTransportFactory() {
        return new DefaultClientTransportFactory();
    }

    @ConditionalOnProperty(name = {"io.rsocket.broker.client.block"}, matchIfMissing = true)
    @Bean
    public ClientThreadManager clientThreadManager() {
        return new ClientThreadManager();
    }

    @ConditionalOnProperty(name = {"io.rsocket.broker.client.auto-connect"}, matchIfMissing = true)
    @Bean
    public BrokerRSocketRequester brokerClientRSocketRequester(BrokerRSocketRequesterBuilder brokerRSocketRequesterBuilder, BrokerClientProperties brokerClientProperties, ObjectProvider<ClientTransportFactory> objectProvider, ClientThreadManager clientThreadManager) {
        if (CollectionUtils.isEmpty(brokerClientProperties.getBrokers())) {
            throw new IllegalStateException("io.rsocket.broker.client.brokers may not be empty");
        }
        URI next = brokerClientProperties.getBrokers().iterator().next();
        BrokerRSocketRequester m6transport = brokerRSocketRequesterBuilder.m6transport((ClientTransport) objectProvider.orderedStream().filter(clientTransportFactory -> {
            return clientTransportFactory.supports(next);
        }).findFirst().map(clientTransportFactory2 -> {
            return (ClientTransport) clientTransportFactory2.create(next);
        }).orElseThrow(() -> {
            return new IllegalStateException("Unknown transport " + brokerClientProperties);
        }));
        m6transport.rsocketClient().source().subscribe();
        return m6transport;
    }
}
