package io.activej.rpc.client;

import io.activej.async.callback.Callback;
import io.activej.async.exception.AsyncTimeoutException;
import io.activej.async.service.ReactiveService;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.common.builder.AbstractBuilder;
import io.activej.csp.process.frame.FrameFormat;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.api.attribute.JmxReducers;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.net.socket.tcp.SslTcpSocket;
import io.activej.net.socket.tcp.TcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.AbstractNioReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.activej.reactor.net.SocketSettings;
import io.activej.reactor.nio.NioReactor;
import io.activej.rpc.client.jmx.RpcConnectStats;
import io.activej.rpc.client.jmx.RpcRequestStats;
import io.activej.rpc.client.sender.RpcSender;
import io.activej.rpc.client.sender.strategy.RpcStrategy;
import io.activej.rpc.protocol.RpcException;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.rpc.protocol.RpcStream;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.SerializerFactory;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/rpc/client/RpcClient.class */
public final class RpcClient extends AbstractNioReactive implements IRpcClient, ReactiveService, ReactiveJmxBeanWithStats {
    private static final boolean CHECKS;
    public static final Duration DEFAULT_CONNECT_TIMEOUT;
    public static final Duration DEFAULT_RECONNECT_INTERVAL;
    public static final MemSize DEFAULT_PACKET_SIZE;
    private static final RpcException SET_STRATEGY_EXCEPTION;
    private static final RpcException NO_SENDER_AVAILABLE_EXCEPTION;
    private static final RpcException CLIENT_IS_STOPPED;
    private Logger logger;
    private SocketSettings socketSettings;
    private SSLContext sslContext;
    private Executor sslExecutor;
    private RpcStrategy strategy;
    private final Set<InetSocketAddress> pendingConnections;
    private final Map<InetSocketAddress, RpcClientConnection> connections;
    private RpcStrategy newStrategy;
    private boolean newStrategyRetry;
    private SettablePromise<Void> newStrategyPromise;
    private final Set<InetSocketAddress> newConnections;
    private MemSize defaultPacketSize;

    @Nullable
    private FrameFormat frameFormat;
    private Duration autoFlushInterval;
    private Duration keepAliveInterval;
    private long connectTimeoutMillis;
    private long reconnectIntervalMillis;
    private BinarySerializer<RpcMessage> requestSerializer;
    private BinarySerializer<RpcMessage> responseSerializer;
    private boolean forcedShutdown;
    private RpcSender requestSender;

    @Nullable
    private SettablePromise<Void> stopPromise;
    private final RpcClientConnectionPool pool;
    static final Duration SMOOTHING_WINDOW;
    private boolean monitoring;
    private final RpcRequestStats generalRequestsStats;
    private final Map<Class<?>, RpcRequestStats> requestStatsPerClass;
    private final Map<InetSocketAddress, RpcConnectStats> connectsStatsPerAddress;
    private final ExceptionStats lastProtocolError;
    private final TcpSocket.JmxInspector statsSocket;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/activej/rpc/client/RpcClient$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, RpcClient> {
        private Builder() {
        }

        public Builder withMessageTypes(List<Class<?>> list) {
            return withSerializer(((SerializerFactory) SerializerFactory.builder().withSubclasses(RpcMessage.SUBCLASSES_ID, list).build()).create(RpcMessage.class));
        }

        public Builder withMessageTypes(Class<?>... clsArr) {
            return withMessageTypes(List.of((Object[]) clsArr));
        }

        public Builder withSerializer(BinarySerializer<RpcMessage> binarySerializer) {
            return withSerializer(binarySerializer, binarySerializer);
        }

        public Builder withSerializer(BinarySerializer<RpcMessage> binarySerializer, BinarySerializer<RpcMessage> binarySerializer2) {
            checkNotBuilt(this);
            RpcClient.this.requestSerializer = binarySerializer;
            RpcClient.this.responseSerializer = binarySerializer2;
            return this;
        }

        public Builder withRequestsSerializer(BinarySerializer<RpcMessage> binarySerializer) {
            checkNotBuilt(this);
            RpcClient.this.requestSerializer = binarySerializer;
            return this;
        }

        public Builder withResponsesSerializer(BinarySerializer<RpcMessage> binarySerializer) {
            checkNotBuilt(this);
            RpcClient.this.responseSerializer = binarySerializer;
            return this;
        }

        public Builder withSocketSettings(SocketSettings socketSettings) {
            checkNotBuilt(this);
            RpcClient.this.socketSettings = socketSettings;
            return this;
        }

        public Builder withStrategy(RpcStrategy rpcStrategy) {
            checkNotBuilt(this);
            RpcClient.this.newStrategy = rpcStrategy;
            return this;
        }

        public Builder withStreamProtocol(MemSize memSize) {
            checkNotBuilt(this);
            RpcClient.this.defaultPacketSize = memSize;
            return this;
        }

        public Builder withStreamProtocol(MemSize memSize, @Nullable FrameFormat frameFormat) {
            checkNotBuilt(this);
            RpcClient.this.defaultPacketSize = memSize;
            RpcClient.this.frameFormat = frameFormat;
            return this;
        }

        public Builder withAutoFlush(Duration duration) {
            checkNotBuilt(this);
            RpcClient.this.autoFlushInterval = duration;
            return this;
        }

        public Builder withKeepAlive(Duration duration) {
            checkNotBuilt(this);
            RpcClient.this.keepAliveInterval = duration;
            return this;
        }

        public Builder withConnectTimeout(Duration duration) {
            checkNotBuilt(this);
            RpcClient.this.connectTimeoutMillis = duration.toMillis();
            return this;
        }

        public Builder withReconnectInterval(Duration duration) {
            checkNotBuilt(this);
            RpcClient.this.reconnectIntervalMillis = duration.toMillis();
            return this;
        }

        public Builder withSslEnabled(SSLContext sSLContext, Executor executor) {
            checkNotBuilt(this);
            RpcClient.this.sslContext = sSLContext;
            RpcClient.this.sslExecutor = executor;
            return this;
        }

        public Builder withLogger(Logger logger) {
            checkNotBuilt(this);
            RpcClient.this.logger = logger;
            return this;
        }

        public Builder withForcedShutdown() {
            RpcClient.this.forcedShutdown = true;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public RpcClient m1doBuild() {
            Checks.checkState((RpcClient.this.requestSerializer == null || RpcClient.this.responseSerializer == null) ? false : true);
            return RpcClient.this;
        }
    }

    /* loaded from: input_file:io/activej/rpc/client/RpcClient$NoSenderAvailable.class */
    public static final class NoSenderAvailable implements RpcSender {
        @Override // io.activej.rpc.client.sender.RpcSender
        public <I, O> void sendRequest(I i, int i2, Callback<O> callback) {
            callback.accept((Object) null, RpcClient.NO_SENDER_AVAILABLE_EXCEPTION);
        }
    }

    /* loaded from: input_file:io/activej/rpc/client/RpcClient$NoServersRpcStrategy.class */
    public static final class NoServersRpcStrategy implements RpcStrategy {
        @Override // io.activej.rpc.client.sender.strategy.RpcStrategy
        public Set<InetSocketAddress> getAddresses() {
            return Set.of();
        }

        @Override // io.activej.rpc.client.sender.strategy.RpcStrategy
        public RpcSender createSender(RpcClientConnectionPool rpcClientConnectionPool) {
            return null;
        }
    }

    private RpcClient(NioReactor nioReactor) {
        super(nioReactor);
        this.logger = LoggerFactory.getLogger(getClass());
        this.socketSettings = SocketSettings.defaultInstance();
        this.strategy = new NoServersRpcStrategy();
        this.pendingConnections = new HashSet();
        this.connections = new HashMap();
        this.newStrategy = this.strategy;
        this.newConnections = new HashSet();
        this.defaultPacketSize = DEFAULT_PACKET_SIZE;
        this.autoFlushInterval = Duration.ZERO;
        this.keepAliveInterval = Duration.ZERO;
        this.connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT.toMillis();
        this.reconnectIntervalMillis = DEFAULT_RECONNECT_INTERVAL.toMillis();
        this.requestSender = new NoSenderAvailable();
        Map<InetSocketAddress, RpcClientConnection> map = this.connections;
        Objects.requireNonNull(map);
        this.pool = (v1) -> {
            return r1.get(v1);
        };
        this.monitoring = false;
        this.generalRequestsStats = RpcRequestStats.create(SMOOTHING_WINDOW);
        this.requestStatsPerClass = new HashMap();
        this.connectsStatsPerAddress = new HashMap();
        this.lastProtocolError = ExceptionStats.create();
        this.statsSocket = new TcpSocket.JmxInspector();
    }

    public static Builder builder(NioReactor nioReactor) {
        return new Builder();
    }

    public SocketSettings getSocketSettings() {
        return this.socketSettings;
    }

    public Promise<Void> start() {
        Reactive.checkInReactorThread(this);
        Checks.checkState(this.stopPromise == null);
        return changeStrategy(this.newStrategy, false);
    }

    public Promise<Void> changeStrategy(RpcStrategy rpcStrategy, boolean z) {
        Reactive.checkInReactorThread(this);
        if (this.stopPromise != null) {
            return Promise.ofException(CLIENT_IS_STOPPED);
        }
        if (this.newStrategyPromise != null) {
            SettablePromise<Void> settablePromise = this.newStrategyPromise;
            this.newStrategyPromise = null;
            settablePromise.setException(SET_STRATEGY_EXCEPTION);
            if (this.newStrategyPromise != null) {
                return Promise.ofException(SET_STRATEGY_EXCEPTION);
            }
        }
        SettablePromise<Void> settablePromise2 = new SettablePromise<>();
        this.newStrategy = rpcStrategy;
        this.newStrategyPromise = settablePromise2;
        this.newStrategyRetry = z;
        for (InetSocketAddress inetSocketAddress : rpcStrategy.getAddresses()) {
            if (!this.connections.containsKey(inetSocketAddress) && !this.pendingConnections.contains(inetSocketAddress)) {
                this.pendingConnections.add(inetSocketAddress);
                this.newConnections.add(inetSocketAddress);
                this.logger.info("Connecting: {}", inetSocketAddress);
                connect(inetSocketAddress);
            }
        }
        updateStrategy();
        return settablePromise2;
    }

    public Promise<Void> stop() {
        Reactive.checkInReactorThread(this);
        if (this.stopPromise != null) {
            return this.stopPromise;
        }
        this.stopPromise = new SettablePromise<>();
        if (this.connections.isEmpty()) {
            onClientStop();
            return this.stopPromise;
        }
        this.pendingConnections.clear();
        Iterator it = new ArrayList(this.connections.values()).iterator();
        while (it.hasNext()) {
            RpcClientConnection rpcClientConnection = (RpcClientConnection) it.next();
            if (this.forcedShutdown) {
                rpcClientConnection.forceShutdown();
            } else {
                rpcClientConnection.shutdown();
            }
        }
        return this.stopPromise;
    }

    private void connect(InetSocketAddress inetSocketAddress) {
        TcpSocket.connect(this.reactor, inetSocketAddress, this.connectTimeoutMillis, this.socketSettings).whenResult(tcpSocket -> {
            this.newConnections.remove(inetSocketAddress);
            if (!this.pendingConnections.contains(inetSocketAddress) || this.stopPromise != null) {
                tcpSocket.close();
                return;
            }
            this.statsSocket.onConnect(tcpSocket);
            tcpSocket.setInspector(this.statsSocket);
            RpcStream rpcStream = new RpcStream(this.sslContext == null ? tcpSocket : SslTcpSocket.wrapClientSocket(this.reactor, tcpSocket, this.sslContext, this.sslExecutor), this.responseSerializer, this.requestSerializer, this.defaultPacketSize, this.autoFlushInterval, this.frameFormat, false);
            RpcClientConnection rpcClientConnection = new RpcClientConnection(this.reactor, this, inetSocketAddress, rpcStream, this.keepAliveInterval.toMillis());
            rpcStream.setListener(rpcClientConnection);
            if (isMonitoring()) {
                rpcClientConnection.startMonitoring();
            }
            this.connectsStatsPerAddress.computeIfAbsent(inetSocketAddress, inetSocketAddress2 -> {
                return new RpcConnectStats(this.reactor);
            }).recordSuccessfulConnect();
            this.logger.info("Connection to {} established", inetSocketAddress);
            this.pendingConnections.remove(inetSocketAddress);
            this.connections.put(inetSocketAddress, rpcClientConnection);
            updateStrategy();
        }).whenException(exc -> {
            this.newConnections.remove(inetSocketAddress);
            this.logger.warn("Connection {} failed", inetSocketAddress, exc);
            if (this.pendingConnections.contains(inetSocketAddress) && this.stopPromise == null) {
                this.reactor.delayBackground(this.reconnectIntervalMillis, () -> {
                    if (this.pendingConnections.contains(inetSocketAddress) && this.stopPromise == null) {
                        this.logger.info("Reconnecting: {}", inetSocketAddress);
                        connect(inetSocketAddress);
                    }
                });
                updateStrategy();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onClosedConnection(InetSocketAddress inetSocketAddress) {
        if (this.connections.remove(inetSocketAddress) == null) {
            return;
        }
        this.logger.info("Connection closed: {}", inetSocketAddress);
        if (this.stopPromise == null) {
            this.pendingConnections.add(inetSocketAddress);
            this.reactor.delayBackground(this.reconnectIntervalMillis, () -> {
                if (this.pendingConnections.contains(inetSocketAddress) && this.stopPromise == null) {
                    this.logger.info("Reconnecting: {}", inetSocketAddress);
                    connect(inetSocketAddress);
                }
            });
        } else if (this.connections.isEmpty()) {
            onClientStop();
        }
        updateStrategy();
    }

    private void onClientStop() {
        if (!$assertionsDisabled && this.stopPromise == null) {
            throw new AssertionError();
        }
        if (this.newStrategyPromise != null) {
            SettablePromise<Void> settablePromise = this.newStrategyPromise;
            this.newStrategyPromise = null;
            settablePromise.setException(CLIENT_IS_STOPPED);
            if (!$assertionsDisabled && this.newStrategyPromise != null) {
                throw new AssertionError();
            }
        }
        this.stopPromise.set((Object) null);
    }

    private void updateStrategy() {
        if (this.stopPromise != null) {
            return;
        }
        if (this.newStrategy == null || !this.newConnections.isEmpty()) {
            this.requestSender = (RpcSender) Utils.nonNullElseGet(this.strategy.createSender(this.pool), NoSenderAvailable::new);
            return;
        }
        RpcStrategy rpcStrategy = this.newStrategy;
        RpcSender createSender = rpcStrategy.createSender(this.pool);
        SettablePromise<Void> settablePromise = this.newStrategyPromise;
        if (createSender == null && this.newStrategyRetry) {
            return;
        }
        this.newStrategy = null;
        this.newStrategyPromise = null;
        if (createSender != null) {
            this.strategy = rpcStrategy;
            this.requestSender = createSender;
        }
        Set<InetSocketAddress> addresses = this.strategy.getAddresses();
        HashSet hashSet = new HashSet();
        Iterator<InetSocketAddress> it = this.pendingConnections.iterator();
        while (it.hasNext()) {
            InetSocketAddress next = it.next();
            if (!addresses.contains(next)) {
                hashSet.add(next);
                it.remove();
            }
        }
        Stream stream = new ArrayList(this.connections.keySet()).stream();
        Objects.requireNonNull(addresses);
        Stream filter = stream.filter(Utils.not((v1) -> {
            return r1.contains(v1);
        }));
        Map<InetSocketAddress, RpcClientConnection> map = this.connections;
        Objects.requireNonNull(map);
        filter.map((v1) -> {
            return r1.remove(v1);
        }).toList().forEach((v0) -> {
            v0.shutdown();
        });
        if (createSender != null) {
            settablePromise.set((Object) null);
        } else {
            settablePromise.setException(new RpcException("Could not establish connection to " + hashSet));
        }
    }

    @Override // io.activej.rpc.client.IRpcClient
    public <I, O> void sendRequest(I i, int i2, Callback<O> callback) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        if (i2 > 0) {
            this.requestSender.sendRequest(i, i2, callback);
        } else {
            callback.accept((Object) null, new AsyncTimeoutException("RPC request has timed out"));
        }
    }

    @Override // io.activej.rpc.client.IRpcClient
    public <I, O> void sendRequest(I i, Callback<O> callback) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        this.requestSender.sendRequest(i, callback);
    }

    public IRpcClient adaptToAnotherReactor(final Reactor reactor) {
        return reactor == this.reactor ? this : new IRpcClient() { // from class: io.activej.rpc.client.RpcClient.1
            @Override // io.activej.rpc.client.IRpcClient
            public <I, O> void sendRequest(I i, int i2, Callback<O> callback) {
                if (RpcClient.CHECKS) {
                    Reactor.checkInReactorThread(reactor);
                }
                if (i2 <= 0) {
                    callback.accept((Object) null, new AsyncTimeoutException("RPC request has timed out"));
                    return;
                }
                reactor.startExternalTask();
                NioReactor nioReactor = RpcClient.this.reactor;
                Reactor reactor2 = reactor;
                nioReactor.execute(() -> {
                    RpcClient.this.requestSender.sendRequest(i, i2, (obj, exc) -> {
                        reactor2.execute(() -> {
                            callback.accept(obj, exc);
                        });
                        reactor2.completeExternalTask();
                    });
                });
            }
        };
    }

    @VisibleForTesting
    public RpcSender getRequestSender() {
        return this.requestSender;
    }

    @VisibleForTesting
    public BinarySerializer<RpcMessage> getRequestSerializer() {
        return this.requestSerializer;
    }

    @VisibleForTesting
    public BinarySerializer<RpcMessage> getResponseSerializer() {
        return this.responseSerializer;
    }

    public String toString() {
        return "RpcClient{" + this.connections + "}";
    }

    @JmxOperation(description = "enable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled) ]")
    public void startMonitoring() {
        this.monitoring = true;
        Iterator<RpcClientConnection> it = this.connections.values().iterator();
        while (it.hasNext()) {
            it.next().startMonitoring();
        }
    }

    @JmxOperation(description = "disable monitoring [ when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled) ]")
    public void stopMonitoring() {
        this.monitoring = false;
        Iterator<RpcClientConnection> it = this.connections.values().iterator();
        while (it.hasNext()) {
            it.next().stopMonitoring();
        }
    }

    @JmxAttribute(description = "when monitoring is enabled more stats are collected, but it causes more overhead (for example, responseTime and requestsStatsPerClass are collected only when monitoring is enabled)")
    public boolean isMonitoring() {
        return this.monitoring;
    }

    @JmxAttribute(name = "requests", extraSubAttributes = {"totalRequests"})
    public RpcRequestStats getGeneralRequestsStats() {
        return this.generalRequestsStats;
    }

    @JmxAttribute(reducer = JmxReducers.JmxReducerSum.class)
    public long getTotalSuccessfulConnects() {
        return this.connectsStatsPerAddress.values().stream().mapToLong((v0) -> {
            return v0.getSuccessfulConnects();
        }).sum();
    }

    @JmxAttribute(reducer = JmxReducers.JmxReducerSum.class)
    public long getTotalFailedConnects() {
        return this.connectsStatsPerAddress.values().stream().mapToLong((v0) -> {
            return v0.getFailedConnects();
        }).sum();
    }

    @JmxAttribute(description = "request stats distributed by request class")
    public Map<Class<?>, RpcRequestStats> getRequestsStatsPerClass() {
        return this.requestStatsPerClass;
    }

    @JmxAttribute
    public Map<InetSocketAddress, RpcConnectStats> getConnectsStatsPerAddress() {
        return this.connectsStatsPerAddress;
    }

    @JmxAttribute(description = "request stats for current connections (when connection is closed stats are removed)")
    public Map<InetSocketAddress, RpcClientConnection> getRequestStatsPerConnection() {
        return this.connections;
    }

    @JmxAttribute(reducer = JmxReducers.JmxReducerSum.class)
    public int getActiveConnections() {
        return this.connections.size();
    }

    @JmxAttribute(reducer = JmxReducers.JmxReducerSum.class)
    public int getActiveRequests() {
        int i = 0;
        Iterator<RpcClientConnection> it = this.connections.values().iterator();
        while (it.hasNext()) {
            i += it.next().getActiveRequests();
        }
        return i;
    }

    @JmxAttribute(description = "exception that occurred because of protocol error (serialization, deserialization, compression, decompression, etc)")
    public ExceptionStats getLastProtocolError() {
        return this.lastProtocolError;
    }

    @JmxAttribute
    public TcpSocket.JmxInspector getStatsSocket() {
        return this.statsSocket;
    }

    @JmxAttribute
    public List<String> getUnresponsiveServers() {
        return this.stopPromise != null ? List.of() : (List) this.connectsStatsPerAddress.entrySet().stream().filter(entry -> {
            return !((RpcConnectStats) entry.getValue()).isConnected();
        }).map(entry2 -> {
            return ((InetSocketAddress) entry2.getKey()).toString();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RpcRequestStats ensureRequestStatsPerClass(Class<?> cls) {
        return this.requestStatsPerClass.computeIfAbsent(cls, cls2 -> {
            return RpcRequestStats.create(SMOOTHING_WINDOW);
        });
    }

    static {
        $assertionsDisabled = !RpcClient.class.desiredAssertionStatus();
        CHECKS = Checks.isEnabled(RpcClient.class);
        DEFAULT_CONNECT_TIMEOUT = ApplicationSettings.getDuration(RpcClient.class, "connectTimeout", Duration.ZERO);
        DEFAULT_RECONNECT_INTERVAL = ApplicationSettings.getDuration(RpcClient.class, "reconnectInterval", Duration.ZERO);
        DEFAULT_PACKET_SIZE = ApplicationSettings.getMemSize(RpcClient.class, "packetSize", ChannelSerializer.DEFAULT_INITIAL_BUFFER_SIZE);
        SET_STRATEGY_EXCEPTION = new RpcException("Could not change strategy");
        NO_SENDER_AVAILABLE_EXCEPTION = new RpcException("No senders available");
        CLIENT_IS_STOPPED = new RpcException("Client is stopped");
        SMOOTHING_WINDOW = Duration.ofMinutes(1L);
    }
}
