/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.base.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SignalType;

public class OutboundDataSequencer {
    public static final int DEFAULT_BUFFER_CAPACITY = 8192;
    private int bufferCapacity = 8192;

    public void setBufferCapacity(int bufferCapacity) {
        this.bufferCapacity = bufferCapacity;
    }

    public Flux<ByteBuf> sequence(Flux<ByteBuf> data) {
        return Flux.create(dataSink -> data.subscribe((CoreSubscriber)new DataSubscriber((FluxSink<ByteBuf>)dataSink)));
    }

    private class DataSubscriber
    extends BaseSubscriber<ByteBuf> {
        private final FluxSink<ByteBuf> dataSink;
        private ByteBuf currentBuffer;

        public DataSubscriber(FluxSink<ByteBuf> dataSink) {
            this.dataSink = dataSink;
        }

        protected void hookOnNext(ByteBuf buffer) {
            this.currentBuffer = this.currentBuffer == null ? buffer : Unpooled.wrappedBuffer((ByteBuf[])new ByteBuf[]{this.currentBuffer, buffer});
            if (this.currentBuffer.readableBytes() > OutboundDataSequencer.this.bufferCapacity) {
                this.dataSink.next((Object)this.currentBuffer.readRetainedSlice(OutboundDataSequencer.this.bufferCapacity));
            }
        }

        protected void hookOnError(Throwable throwable) {
            this.dataSink.error(throwable);
        }

        protected void hookOnComplete() {
            if (this.currentBuffer != null && this.currentBuffer.readableBytes() > 0) {
                this.dataSink.next((Object)this.currentBuffer);
            }
            this.dataSink.complete();
        }

        protected void hookFinally(SignalType type) {
        }
    }
}

