package io.rsocket.aeron.reactor;

import io.aeron.Publication;
import io.aeron.logbuffer.BufferClaim;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.aeron.util.Constants;
import io.rsocket.aeron.util.NotConnectedException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
import org.agrona.concurrent.UnsafeBuffer;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.core.publisher.WorkQueueProcessor;

/* loaded from: input_file:io/rsocket/aeron/reactor/AeronPublicationSubscriber.class */
public class AeronPublicationSubscriber extends AtomicBoolean implements Subscription, CoreSubscriber<ByteBuf> {
    private static final Logger logger = LoggerFactory.getLogger(AeronPublicationSubscriber.class);
    private static final ThreadLocal<BufferClaim> BUFFER_CLAIMS = ThreadLocal.withInitial(BufferClaim::new);
    private static final ThreadLocal<UnsafeBuffer> UNSAFE_BUFFER = ThreadLocal.withInitial(UnsafeBuffer::new);
    private static final int BUFFER_SIZE = 128;
    private static final int REFILL = 42;
    private static final int MAX_EFFORT = 8;
    private final CoreSubscriber<? super ByteBuf> actual;
    private final Publication publication;
    private final WorkQueueProcessor<Runnable> workQueueProcessor;
    private volatile ByteBuf currentWorkingFrame;
    private Subscription subscription;
    private volatile int missed;
    private volatile int wip;
    private volatile long requested;
    private volatile long requestedUpstream;
    private final String name;
    private AtomicIntegerFieldUpdater<AeronPublicationSubscriber> MISSED = AtomicIntegerFieldUpdater.newUpdater(AeronPublicationSubscriber.class, "missed");
    private AtomicIntegerFieldUpdater<AeronPublicationSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(AeronPublicationSubscriber.class, "wip");
    private AtomicLongFieldUpdater<AeronPublicationSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(AeronPublicationSubscriber.class, "requested");
    private AtomicLongFieldUpdater<AeronPublicationSubscriber> REQUESTED_UPSTREAM = AtomicLongFieldUpdater.newUpdater(AeronPublicationSubscriber.class, "requestedUpstream");
    private final ManyToOneConcurrentArrayQueue<ByteBuf> frameQueue = new ManyToOneConcurrentArrayQueue<>(BUFFER_SIZE);

    AeronPublicationSubscriber(String str, WorkQueueProcessor<Runnable> workQueueProcessor, CoreSubscriber<? super ByteBuf> coreSubscriber, Publication publication) {
        this.name = str;
        this.workQueueProcessor = workQueueProcessor;
        this.actual = coreSubscriber;
        this.publication = publication;
    }

    public static AeronPublicationSubscriber create(String str, WorkQueueProcessor<Runnable> workQueueProcessor, CoreSubscriber<? super ByteBuf> coreSubscriber, Publication publication) {
        return new AeronPublicationSubscriber(str, workQueueProcessor, coreSubscriber, publication);
    }

    public void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.subscription, subscription)) {
            this.subscription = subscription;
            this.actual.onSubscribe(this);
            this.requestedUpstream = 128L;
            this.subscription.request(128L);
        }
    }

    public void request(long j) {
        Operators.addCap(this.REQUESTED, this, j);
        tryEmit();
    }

    public void cancel() {
        if (this.frameQueue.isEmpty()) {
            return;
        }
        this.frameQueue.drain((v0) -> {
            ReferenceCountUtil.safeRelease(v0);
        });
    }

    public void onNext(ByteBuf byteBuf) {
        if (!this.frameQueue.offer(byteBuf)) {
            throw new IllegalStateException("missing back pressure ");
        }
        tryEmit();
    }

    public void onError(Throwable th) {
        th.printStackTrace();
        if (this.subscription != null) {
            this.subscription.cancel();
        }
        this.actual.onError(th);
    }

    public void onComplete() {
        set(true);
        tryEmit();
    }

    private void tryEmit() {
        this.MISSED.incrementAndGet(this);
        if (this.WIP.compareAndSet(this, 0, 1)) {
            emit();
        }
    }

    private void emit() {
        do {
            this.MISSED.set(this, 0);
            while (!this.frameQueue.isEmpty()) {
                if (this.currentWorkingFrame == null) {
                    this.currentWorkingFrame = (ByteBuf) this.frameQueue.poll();
                }
                if (this.currentWorkingFrame == null || this.REQUESTED.get(this) <= 0 || !tryClaimOrOffer(this.currentWorkingFrame)) {
                    break;
                }
                Operators.addCap(this.REQUESTED, this, -1L);
                long addCap = Operators.addCap(this.REQUESTED_UPSTREAM, this, -1L);
                if (logger.isDebugEnabled()) {
                    logger.debug(this.name + " emitted frame: \n{}\n", ByteBufUtil.prettyHexDump(this.currentWorkingFrame));
                }
                this.currentWorkingFrame.release();
                this.currentWorkingFrame = null;
                if (addCap < 42) {
                    int size = BUFFER_SIZE - this.frameQueue.size();
                    Operators.addCap(this.REQUESTED_UPSTREAM, this, size);
                    this.subscription.request(size);
                }
            }
        } while (this.MISSED.get(this) != 0);
        if (this.currentWorkingFrame != null) {
            this.workQueueProcessor.onNext(this::emit);
            return;
        }
        if (get() && this.frameQueue.isEmpty()) {
            this.actual.onComplete();
        }
        this.WIP.set(this, 0);
    }

    private boolean tryClaimOrOffer(ByteBuf byteBuf) {
        ByteBuffer nioBuffer = byteBuf.nioBuffer();
        boolean z = false;
        int readerIndex = byteBuf.readerIndex();
        int readableBytes = byteBuf.readableBytes();
        int i = MAX_EFFORT;
        if (readableBytes >= Constants.AERON_MTU_SIZE) {
            while (true) {
                if (!z) {
                    int i2 = i;
                    i--;
                    if (i2 <= 0) {
                        break;
                    }
                    UnsafeBuffer unsafeBuffer = UNSAFE_BUFFER.get();
                    unsafeBuffer.wrap(nioBuffer, readerIndex, readableBytes);
                    long offer = this.publication.offer(unsafeBuffer);
                    if (offer >= 0) {
                        z = true;
                    } else if (offer == -4) {
                        onError(new NotConnectedException());
                        break;
                    }
                } else {
                    break;
                }
            }
        } else {
            while (true) {
                if (!z) {
                    int i3 = i;
                    i--;
                    if (i3 <= 0) {
                        break;
                    }
                    BufferClaim bufferClaim = BUFFER_CLAIMS.get();
                    long tryClaim = this.publication.tryClaim(readableBytes, bufferClaim);
                    if (tryClaim >= 0) {
                        try {
                            bufferClaim.buffer().putBytes(bufferClaim.offset(), nioBuffer, readerIndex, readableBytes);
                            bufferClaim.commit();
                            z = true;
                        } catch (Throwable th) {
                            bufferClaim.commit();
                            throw th;
                        }
                    } else if (tryClaim == -4) {
                        onError(new NotConnectedException());
                        break;
                    }
                } else {
                    break;
                }
            }
        }
        return z;
    }
}
