package alluxio.client.block.stream;

import alluxio.grpc.DataMessage;
import alluxio.grpc.DataMessageMarshaller;
import alluxio.network.protocol.databuffer.DataBuffer;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/alluxio-core-client-fs-2.7.3.jar:alluxio/client/block/stream/GrpcDataMessageBlockingStream.class */
public class GrpcDataMessageBlockingStream<ReqT, ResT> extends GrpcBlockingStream<ReqT, ResT> {
    private final DataMessageMarshaller<ReqT> mRequestMarshaller;
    private final DataMessageMarshaller<ResT> mResponseMarshaller;

    public GrpcDataMessageBlockingStream(Function<StreamObserver<ResT>, StreamObserver<ReqT>> function, int i, String str, DataMessageMarshaller<ReqT> dataMessageMarshaller, DataMessageMarshaller<ResT> dataMessageMarshaller2) {
        super(streamObserver -> {
            return (StreamObserver) function.apply(new DataMessageClientResponseObserver(streamObserver, dataMessageMarshaller, dataMessageMarshaller2));
        }, i, str);
        this.mRequestMarshaller = dataMessageMarshaller;
        this.mResponseMarshaller = dataMessageMarshaller2;
    }

    @Override // alluxio.client.block.stream.GrpcBlockingStream
    public ResT receive(long j) throws IOException {
        if (this.mResponseMarshaller == null) {
            return (ResT) super.receive(j);
        }
        DataMessage<ResT, DataBuffer> receiveDataMessage = receiveDataMessage(j);
        if (receiveDataMessage == null) {
            return null;
        }
        return this.mResponseMarshaller.combineData(receiveDataMessage);
    }

    public DataMessage<ResT, DataBuffer> receiveDataMessage(long j) throws IOException {
        Preconditions.checkNotNull(this.mResponseMarshaller, "Cannot retrieve data message without a response marshaller.");
        Object receive = super.receive(j);
        if (receive == null) {
            return null;
        }
        return new DataMessage<>(receive, this.mResponseMarshaller.pollBuffer((DataMessageMarshaller<ResT>) receive));
    }

    public void sendDataMessage(DataMessage<ReqT, DataBuffer> dataMessage, long j) throws IOException {
        if (this.mRequestMarshaller != null) {
            this.mRequestMarshaller.offerBuffer2(dataMessage.getBuffer(), (DataBuffer) dataMessage.getMessage());
        }
        super.send(dataMessage.getMessage(), j);
    }

    @Override // alluxio.client.block.stream.GrpcBlockingStream
    public void waitForComplete(long j) throws IOException {
        DataMessage<ResT, DataBuffer> receiveDataMessage;
        if (this.mResponseMarshaller == null) {
            super.waitForComplete(j);
            return;
        }
        while (!isCanceled() && (receiveDataMessage = receiveDataMessage(j)) != null) {
            if (receiveDataMessage.getBuffer() != null) {
                receiveDataMessage.getBuffer().release();
            }
        }
        super.waitForComplete(j);
    }
}
