package org.springframework.messaging.simp.stomp;

import com.amazonaws.SDKGlobalConfiguration;
import io.netty.channel.EventLoopGroup;
import org.springframework.context.Lifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.Environment;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec;
import reactor.io.net.impl.netty.NettyClientSocketOptions;

/* loaded from: input_file:WEB-INF/lib/spring-messaging-4.3.12.RELEASE.jar:org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.class */
public class Reactor2TcpStompClient extends StompClientSupport implements Lifecycle {
    private final TcpOperations<byte[]> tcpClient;
    private final EventLoopGroup eventLoopGroup;
    private final Environment environment;
    private volatile boolean running;

    /* loaded from: input_file:WEB-INF/lib/spring-messaging-4.3.12.RELEASE.jar:org/springframework/messaging/simp/stomp/Reactor2TcpStompClient$StompTcpClientSpecFactory.class */
    private static class StompTcpClientSpecFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
        private final String host;
        private final int port;
        private final NettyClientSocketOptions socketOptions;
        private final Environment environment;
        private final Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder());

        StompTcpClientSpecFactory(String str, int i, EventLoopGroup eventLoopGroup, Environment environment) {
            this.host = str;
            this.port = i;
            this.socketOptions = new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup);
            this.environment = environment;
        }

        public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
            return tcpClientSpec.env(this.environment).dispatcher(this.environment.getDispatcher("workQueue")).connect(this.host, this.port).codec(this.codec).options(this.socketOptions);
        }
    }

    public Reactor2TcpStompClient() {
        this(SDKGlobalConfiguration.DEFAULT_AWS_CSM_HOST, 61613);
    }

    public Reactor2TcpStompClient(String str, int i) {
        this.running = false;
        this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
        this.environment = new Environment();
        this.tcpClient = new Reactor2TcpClient(new StompTcpClientSpecFactory(str, i, this.eventLoopGroup, this.environment));
    }

    public Reactor2TcpStompClient(TcpOperations<byte[]> tcpOperations) {
        this.running = false;
        this.tcpClient = tcpOperations;
        this.eventLoopGroup = null;
        this.environment = null;
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        if (isRunning()) {
            return;
        }
        this.running = true;
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        if (isRunning()) {
            this.running = false;
            try {
                if (this.eventLoopGroup != null) {
                    this.eventLoopGroup.shutdownGracefully().await(5000L);
                }
                if (this.environment != null) {
                    this.environment.shutdown();
                }
            } catch (InterruptedException e) {
                if (this.logger.isErrorEnabled()) {
                    this.logger.error("Failed to shutdown gracefully", e);
                }
            }
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }

    public ListenableFuture<StompSession> connect(StompSessionHandler stompSessionHandler) {
        return connect(null, stompSessionHandler);
    }

    public ListenableFuture<StompSession> connect(StompHeaders stompHeaders, StompSessionHandler stompSessionHandler) {
        ConnectionHandlingStompSession createSession = createSession(stompHeaders, stompSessionHandler);
        this.tcpClient.connect(createSession);
        return createSession.getSessionFuture();
    }

    public void shutdown() {
        this.tcpClient.shutdown();
    }
}
