/*
 * Decompiled with CFR 0.152.
 */
package dev.snowdrop.vertx.http.common;

import io.vertx.core.streams.WriteStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SignalType;

public class WriteStreamSubscriber<T extends WriteStream<?>, U>
extends BaseSubscriber<U> {
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final T writeStream;
    private final BiConsumer<T, U> nextHandler;
    private final MonoSink<Void> endHook;
    private final long requestLimit;
    private final AtomicLong pendingCount = new AtomicLong();
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final String logPrefix;

    private WriteStreamSubscriber(T writeStream, BiConsumer<T, U> nextHandler, MonoSink<Void> endHook, long requestLimit) {
        this.writeStream = writeStream;
        this.nextHandler = nextHandler;
        this.endHook = endHook;
        this.requestLimit = requestLimit;
        this.logPrefix = "[" + ObjectUtils.getIdentityHexString(writeStream) + "] ";
        writeStream.exceptionHandler(this::exceptionHandler);
        writeStream.drainHandler(this::drainHandler);
    }

    protected void hookOnSubscribe(Subscription subscription) {
        this.logger.debug("{}{} subscribed", (Object)this.logPrefix, this.writeStream);
        this.isActive.set(true);
        this.requestIfNotFull();
    }

    protected void hookOnNext(U value) {
        this.logger.debug("{}Next: {}", (Object)this.logPrefix, value);
        this.nextHandler.accept(this.writeStream, value);
        this.pendingCount.decrementAndGet();
        this.requestIfNotFull();
    }

    protected void hookOnComplete() {
        this.logger.debug("{}Completed", (Object)this.logPrefix);
        this.endHook.success();
    }

    protected void hookOnCancel() {
        this.logger.debug("{}Canceled", (Object)this.logPrefix);
        this.endHook.success();
    }

    protected void hookOnError(Throwable throwable) {
        this.logger.debug("{}Error: {}", (Object)this.logPrefix, (Object)throwable);
        this.endHook.error(throwable);
    }

    protected void hookFinally(SignalType type) {
        this.isActive.set(false);
    }

    private void exceptionHandler(Throwable ignored) {
        this.cancel();
    }

    private void drainHandler(Void event) {
        this.logger.debug("{} drain", (Object)this.logPrefix);
        this.requestIfNotFull();
    }

    private void requestIfNotFull() {
        if (this.isActive.get() && !this.writeStream.writeQueueFull() && this.pendingCount.get() < this.requestLimit) {
            this.logger.debug("{}Requesting more data pendingCount={} requestLimit={}", new Object[]{this.logPrefix, this.pendingCount.get(), this.requestLimit});
            this.request(this.requestLimit - this.pendingCount.getAndSet(this.requestLimit));
        }
    }

    /* synthetic */ WriteStreamSubscriber(WriteStream x0, BiConsumer x1, MonoSink x2, long x3, 1 x4) {
        this(x0, x1, (MonoSink<Void>)x2, x3);
    }

    public static class Builder<T extends WriteStream<?>, U> {
        private T writeStream;
        private BiConsumer<T, U> nextHandler;
        private MonoSink<Void> endHook;
        private long requestLimit = 1L;

        public Builder<T, U> writeStream(T writeStream) {
            this.writeStream = writeStream;
            return this;
        }

        public Builder<T, U> nextHandler(BiConsumer<T, U> nextHandler) {
            this.nextHandler = nextHandler;
            return this;
        }

        public Builder<T, U> endHook(MonoSink<Void> endHook) {
            this.endHook = endHook;
            return this;
        }

        public Builder<T, U> requestLimit(long requestLimit) {
            this.requestLimit = requestLimit;
            return this;
        }

        public WriteStreamSubscriber<T, U> build() {
            Objects.requireNonNull(this.writeStream, "Write stream is required");
            Objects.requireNonNull(this.nextHandler, "Next handler is required");
            Objects.requireNonNull(this.endHook, "End hook is required");
            return new WriteStreamSubscriber((WriteStream)this.writeStream, this.nextHandler, (MonoSink)this.endHook, this.requestLimit, null);
        }
    }
}

