package io.activej.rpc.protocol;

import io.activej.async.exception.AsyncCloseException;
import io.activej.common.MemSize;
import io.activej.csp.consumer.ChannelConsumers;
import io.activej.csp.process.frame.ChannelFrameDecoder;
import io.activej.csp.process.frame.ChannelFrameEncoder;
import io.activej.csp.process.frame.FrameFormat;
import io.activej.csp.supplier.ChannelSuppliers;
import io.activej.datastream.consumer.AbstractStreamConsumer;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.supplier.AbstractStreamSupplier;
import io.activej.datastream.supplier.StreamDataAcceptor;
import io.activej.net.socket.tcp.ITcpSocket;
import io.activej.promise.Promise;
import io.activej.serializer.BinarySerializer;
import java.time.Duration;
import java.util.Objects;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/rpc/protocol/RpcStream.class */
public final class RpcStream {
    private final ChannelDeserializer<RpcMessage> deserializer;
    private final ChannelSerializer<RpcMessage> serializer;
    private Listener listener;
    private final AbstractStreamConsumer<RpcMessage> internalConsumer = new AbstractStreamConsumer<RpcMessage>() { // from class: io.activej.rpc.protocol.RpcStream.1
    };
    private final AbstractStreamSupplier<RpcMessage> internalSupplier = new AbstractStreamSupplier<RpcMessage>() { // from class: io.activej.rpc.protocol.RpcStream.2
        protected void onResumed() {
            RpcStream.this.deserializer.updateDataAcceptor();
            RpcStream.this.listener.onSenderReady(getDataAcceptor());
        }

        protected void onSuspended() {
            if (RpcStream.this.server) {
                RpcStream.this.deserializer.updateDataAcceptor();
            }
            RpcStream.this.listener.onSenderSuspended();
        }
    };
    private final boolean server;
    private final ITcpSocket socket;

    /* loaded from: input_file:io/activej/rpc/protocol/RpcStream$Listener.class */
    public interface Listener extends StreamDataAcceptor<RpcMessage> {
        void onReceiverEndOfStream();

        void onReceiverError(Exception exc);

        void onSenderError(Exception exc);

        void onSerializationError(RpcMessage rpcMessage, Exception exc);

        void onSenderReady(StreamDataAcceptor<RpcMessage> streamDataAcceptor);

        void onSenderSuspended();
    }

    public RpcStream(ITcpSocket iTcpSocket, BinarySerializer<RpcMessage> binarySerializer, BinarySerializer<RpcMessage> binarySerializer2, MemSize memSize, Duration duration, @Nullable FrameFormat frameFormat, boolean z) {
        this.server = z;
        this.socket = iTcpSocket;
        ChannelSerializer<RpcMessage> channelSerializer = (ChannelSerializer) ChannelSerializer.builder(binarySerializer2).withInitialBufferSize(memSize).withAutoFlushInterval(duration).withSerializationErrorHandler((rpcMessage, exc) -> {
            this.listener.onSerializationError(rpcMessage, exc);
        }).build();
        ChannelDeserializer<RpcMessage> create = ChannelDeserializer.create(binarySerializer);
        if (frameFormat != null) {
            ChannelFrameDecoder create2 = ChannelFrameDecoder.create(frameFormat);
            ChannelFrameEncoder create3 = ChannelFrameEncoder.create(frameFormat);
            ChannelSuppliers.ofSocket(iTcpSocket).bindTo(create2.getInput());
            create2.getOutput().bindTo(create.getInput());
            channelSerializer.getOutput().bindTo(create3.getInput());
            create3.getOutput().set(ChannelConsumers.ofSocket(iTcpSocket));
        } else {
            ChannelSuppliers.ofSocket(iTcpSocket).bindTo(create.getInput());
            channelSerializer.getOutput().set(ChannelConsumers.ofSocket(iTcpSocket));
        }
        create.streamTo(this.internalConsumer);
        this.deserializer = create;
        this.serializer = channelSerializer;
    }

    public void setListener(Listener listener) {
        this.listener = listener;
        Promise endOfStream = this.deserializer.getEndOfStream();
        Objects.requireNonNull(listener);
        Promise whenResult = endOfStream.whenResult(listener::onReceiverEndOfStream);
        Objects.requireNonNull(listener);
        whenResult.whenException(listener::onReceiverError);
        Promise acknowledgement = this.serializer.getAcknowledgement();
        Objects.requireNonNull(listener);
        acknowledgement.whenException(listener::onSenderError);
        this.internalSupplier.streamTo(this.serializer);
        this.internalConsumer.resume(this.listener);
    }

    public void receiverSuspend() {
        this.internalConsumer.suspend();
    }

    public void receiverResume() {
        this.internalConsumer.resume(this.listener);
    }

    public void sendEndOfStream() {
        this.internalSupplier.sendEndOfStream();
    }

    public void close() {
        closeEx(new AsyncCloseException("RPC Channel Closed"));
    }

    public void closeEx(Exception exc) {
        this.socket.closeEx(exc);
        this.serializer.closeEx(exc);
        this.deserializer.closeEx(exc);
    }
}
