package io.rsocket.reactor.aeron;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.aeron.Connection;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/reactor/aeron/AeronDuplexConnection.class */
public class AeronDuplexConnection implements DuplexConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger(AeronDuplexConnection.class);
    private final EmitterProcessor<ByteBuffer> processor = EmitterProcessor.create();
    private final Connection connection;
    private final Disposable outboundDisposable;

    public AeronDuplexConnection(Connection connection) {
        this.connection = connection;
        this.outboundDisposable = connection.outbound().send(this.processor).then().subscribe((Consumer) null, th -> {
            LOGGER.warn("outbound of {} was failed with error: {}", this, th);
            dispose();
        }, this::dispose);
    }

    public Mono<Void> send(Publisher<Frame> publisher) {
        return Mono.create(monoSink -> {
            Flux map = Flux.from(publisher).map(frame -> {
                ByteBuffer nioBuffer = frame.content().nioBuffer();
                ReferenceCountUtil.safeRelease(frame);
                return nioBuffer;
            });
            EmitterProcessor<ByteBuffer> emitterProcessor = this.processor;
            emitterProcessor.getClass();
            Consumer consumer = (v1) -> {
                r1.onNext(v1);
            };
            monoSink.getClass();
            Consumer consumer2 = monoSink::error;
            monoSink.getClass();
            map.subscribe(consumer, consumer2, monoSink::success);
        });
    }

    public Mono<Void> sendOne(Frame frame) {
        return send(Mono.just(frame));
    }

    public Flux<Frame> receive() {
        return this.connection.inbound().receive().map(byteBuffer -> {
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(byteBuffer.capacity());
            buffer.writeBytes(byteBuffer);
            return buffer;
        }).map(Frame::from);
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    public void dispose() {
        this.outboundDisposable.dispose();
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }
}
