package io.activej.rpc.server;

import io.activej.common.exception.parse.ParseException;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.jmx.api.JmxRefreshable;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.promise.Promise;
import io.activej.rpc.protocol.RpcControlMessage;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.rpc.protocol.RpcRemoteException;
import io.activej.rpc.protocol.RpcStream;
import java.net.InetAddress;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/rpc/server/RpcServerConnection.class */
public final class RpcServerConnection implements RpcStream.Listener, JmxRefreshable {
    private static final Logger logger = LoggerFactory.getLogger(RpcServerConnection.class);
    private StreamDataAcceptor<RpcMessage> downstreamDataAcceptor;
    private final RpcServer rpcServer;
    private final RpcStream stream;
    private final Map<Class<?>, RpcRequestHandler<?, ?>> handlers;
    private final InetAddress remoteAddress;
    private int activeRequests = 1;
    private final ExceptionStats lastRequestHandlingException = ExceptionStats.create();
    private final ValueStats requestHandlingTime = ValueStats.create(RpcServer.SMOOTHING_WINDOW).withUnit("milliseconds");
    private final EventStats successfulRequests = EventStats.create(RpcServer.SMOOTHING_WINDOW);
    private final EventStats failedRequests = EventStats.create(RpcServer.SMOOTHING_WINDOW);
    private boolean monitoring = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RpcServerConnection(RpcServer rpcServer, InetAddress inetAddress, Map<Class<?>, RpcRequestHandler<?, ?>> map, RpcStream rpcStream) {
        this.rpcServer = rpcServer;
        this.stream = rpcStream;
        this.handlers = map;
        this.remoteAddress = inetAddress;
    }

    private Promise<Object> serve(Object obj) {
        RpcRequestHandler<?, ?> rpcRequestHandler = this.handlers.get(obj.getClass());
        return rpcRequestHandler == null ? Promise.ofException(new ParseException(RpcServerConnection.class, "Failed to process request " + obj)) : rpcRequestHandler.run(obj);
    }

    public void accept(RpcMessage rpcMessage) {
        this.activeRequests++;
        int cookie = rpcMessage.getCookie();
        long currentTimeMillis = this.monitoring ? System.currentTimeMillis() : 0L;
        Object data = rpcMessage.getData();
        serve(data).whenComplete((obj, th) -> {
            if (currentTimeMillis != 0) {
                int currentTimeMillis2 = (int) (System.currentTimeMillis() - currentTimeMillis);
                this.requestHandlingTime.recordValue(currentTimeMillis2);
                this.rpcServer.getRequestHandlingTime().recordValue(currentTimeMillis2);
            }
            if (th == null) {
                this.downstreamDataAcceptor.accept(RpcMessage.of(cookie, obj));
                this.successfulRequests.recordEvent();
                this.rpcServer.getSuccessfulRequests().recordEvent();
            } else {
                logger.warn("Exception while processing request ID {}", Integer.valueOf(cookie), th);
                sendError(RpcMessage.of(cookie, new RpcRemoteException(th)), data, th);
            }
            int i = this.activeRequests - 1;
            this.activeRequests = i;
            if (i == 0) {
                doClose();
                this.stream.sendEndOfStream();
            }
        });
    }

    @Override // io.activej.rpc.protocol.RpcStream.Listener
    public void onReceiverEndOfStream() {
        this.activeRequests--;
        if (this.activeRequests == 0) {
            doClose();
            this.stream.sendEndOfStream();
        }
    }

    @Override // io.activej.rpc.protocol.RpcStream.Listener
    public void onReceiverError(@NotNull Throwable th) {
        logger.error("Receiver error {}", this.remoteAddress, th);
        this.rpcServer.getLastProtocolError().recordException(th, this.remoteAddress);
        doClose();
        this.stream.close();
    }

    @Override // io.activej.rpc.protocol.RpcStream.Listener
    public void onSenderError(@NotNull Throwable th) {
        logger.error("Sender error: {}", this.remoteAddress, th);
        this.rpcServer.getLastProtocolError().recordException(th, this.remoteAddress);
        doClose();
        this.stream.close();
    }

    @Override // io.activej.rpc.protocol.RpcStream.Listener
    public void onSerializationError(RpcMessage rpcMessage, @NotNull Throwable th) {
        logger.error("Serialization error: {} for data {}", new Object[]{this.remoteAddress, rpcMessage.getData(), th});
        sendError(RpcMessage.of(rpcMessage.getCookie(), new RpcRemoteException(th)), rpcMessage.getData(), th);
    }

    @Override // io.activej.rpc.protocol.RpcStream.Listener
    public void onSenderReady(@NotNull StreamDataAcceptor<RpcMessage> streamDataAcceptor) {
        this.downstreamDataAcceptor = streamDataAcceptor;
    }

    @Override // io.activej.rpc.protocol.RpcStream.Listener
    public void onSenderSuspended() {
    }

    private void sendError(RpcMessage rpcMessage, Object obj, @Nullable Throwable th) {
        this.downstreamDataAcceptor.accept(rpcMessage);
        this.lastRequestHandlingException.recordException(th, obj);
        this.rpcServer.getLastRequestHandlingException().recordException(th, obj);
        this.failedRequests.recordEvent();
        this.rpcServer.getFailedRequests().recordEvent();
    }

    private void doClose() {
        this.rpcServer.remove(this);
        this.downstreamDataAcceptor = rpcMessage -> {
        };
    }

    public void shutdown() {
        if (this.downstreamDataAcceptor != null) {
            this.downstreamDataAcceptor.accept(RpcMessage.of(-1, RpcControlMessage.CLOSE));
        }
    }

    public void startMonitoring() {
        this.monitoring = true;
    }

    public void stopMonitoring() {
        this.monitoring = false;
    }

    @JmxAttribute
    public EventStats getSuccessfulRequests() {
        return this.successfulRequests;
    }

    @JmxAttribute
    public EventStats getFailedRequests() {
        return this.failedRequests;
    }

    @JmxAttribute
    public ValueStats getRequestHandlingTime() {
        return this.requestHandlingTime;
    }

    @JmxAttribute
    public ExceptionStats getLastRequestHandlingException() {
        return this.lastRequestHandlingException;
    }

    @JmxAttribute
    public String getRemoteAddress() {
        return this.remoteAddress.toString();
    }

    public void refresh(long j) {
        this.successfulRequests.refresh(j);
        this.failedRequests.refresh(j);
        this.requestHandlingTime.refresh(j);
    }
}
