package io.vertx.rxcore.java.impl;

import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicLong;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.streams.WriteStream;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/vertx/rxcore/java/impl/Regulator.class */
public class Regulator<R> implements Observable.Operator<R, R> {
    protected Regulator<R>.Gate<R> gate;

    /* loaded from: input_file:io/vertx/rxcore/java/impl/Regulator$BufferedWriteStream.class */
    public class BufferedWriteStream extends Regulator<R>.Throttle<Buffer> implements WriteStream<Regulator<R>.BufferedWriteStream> {
        protected Handler<Boolean> restrictHandler;
        protected Handler<Throwable> exceptionHandler;
        protected WriteStream<Buffer> out;

        public BufferedWriteStream(WriteStream<Buffer> writeStream) {
            super();
            this.out = writeStream;
            writeStream.drainHandler(new Handler<Void>() { // from class: io.vertx.rxcore.java.impl.Regulator.BufferedWriteStream.1
                public void handle(Void r3) {
                    BufferedWriteStream.this.release();
                }
            });
            writeStream.exceptionHandler(new Handler<Throwable>() { // from class: io.vertx.rxcore.java.impl.Regulator.BufferedWriteStream.2
                public void handle(Throwable th) {
                    BufferedWriteStream.this.fail(th);
                }
            });
        }

        public void restrictHandler(Handler<Boolean> handler) {
            this.restrictHandler = handler;
        }

        /* renamed from: write, reason: merged with bridge method [inline-methods] */
        public Regulator<R>.BufferedWriteStream m29write(Buffer buffer) {
            send(buffer);
            return this;
        }

        /* renamed from: setWriteQueueMaxSize, reason: merged with bridge method [inline-methods] */
        public Regulator<R>.BufferedWriteStream m32setWriteQueueMaxSize(int i) {
            this.out.setWriteQueueMaxSize(i);
            return this;
        }

        public boolean writeQueueFull() {
            return false;
        }

        public Regulator<R>.BufferedWriteStream drainHandler(Handler<Void> handler) {
            throw new RuntimeException("drainHandler is not supported");
        }

        public Regulator<R>.BufferedWriteStream exceptionHandler(Handler<Throwable> handler) {
            this.exceptionHandler = handler;
            return this;
        }

        protected void restrict(Boolean bool) {
            if (this.restrictHandler != null) {
                this.restrictHandler.handle(bool);
            }
        }

        protected void fail(Throwable th) {
            clear();
            if (this.exceptionHandler != null) {
                this.exceptionHandler.handle(th);
            }
        }

        protected void release() {
            int i = 0;
            while (!this.out.writeQueueFull() && !this.queue.isEmpty()) {
                this.out.write((Buffer) this.queue.pollFirst());
                i++;
            }
            restrict(false);
            if (this.completeHandler != null) {
                this.completeHandler.handle((Object) null);
            }
        }

        @Override // io.vertx.rxcore.java.impl.Regulator.Throttle
        protected void clear() {
            this.queue.clear();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.vertx.rxcore.java.impl.Regulator.Throttle
        public boolean forward(Buffer buffer) {
            if (this.out.writeQueueFull()) {
                return false;
            }
            this.out.write(buffer);
            return true;
        }

        /* renamed from: exceptionHandler, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m30exceptionHandler(Handler handler) {
            return exceptionHandler((Handler<Throwable>) handler);
        }

        /* renamed from: drainHandler, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m31drainHandler(Handler handler) {
            return drainHandler((Handler<Void>) handler);
        }
    }

    /* loaded from: input_file:io/vertx/rxcore/java/impl/Regulator$Gate.class */
    public class Gate<R> extends Regulator<R>.Throttle<R> implements Observer<R> {
        protected Subscriber<R> target;
        protected Boolean restricted;

        public Gate(Subscriber<R> subscriber) {
            super();
            this.target = subscriber;
            this.restricted = false;
        }

        @Override // io.vertx.rxcore.java.impl.Regulator.Throttle
        public void handle(Boolean bool) {
            this.restricted = bool;
        }

        public void onNext(R r) {
            send(r);
        }

        public void onCompleted() {
            complete(new Handler<Void>() { // from class: io.vertx.rxcore.java.impl.Regulator.Gate.1
                public void handle(Void r3) {
                    Gate.this.target.onCompleted();
                }
            });
        }

        public void onError(Throwable th) {
            clear();
            this.target.onError(th);
        }

        @Override // io.vertx.rxcore.java.impl.Regulator.Throttle
        protected boolean forward(R r) {
            this.target.onNext(r);
            return true;
        }
    }

    /* loaded from: input_file:io/vertx/rxcore/java/impl/Regulator$Throttle.class */
    public abstract class Throttle<T> implements Handler<Boolean> {
        protected ArrayDeque<T> queue = new ArrayDeque<>();
        protected Handler<Void> completeHandler;
        protected boolean restricted;

        public Throttle() {
        }

        public void send(T t) {
            if (this.restricted) {
                this.queue.addLast(t);
            } else {
                forward(t);
            }
        }

        @Override // 
        public void handle(Boolean bool) {
            if (bool.booleanValue()) {
                this.restricted = true;
            } else {
                this.restricted = !flush();
            }
        }

        protected void clear() {
            this.queue.clear();
        }

        protected boolean flush() {
            while (!this.queue.isEmpty()) {
                if (!forward(this.queue.peekFirst())) {
                    return false;
                }
                this.queue.pollFirst();
            }
            if (this.completeHandler == null) {
                return true;
            }
            this.completeHandler.handle((Object) null);
            return true;
        }

        protected void complete(Handler<Void> handler) {
            if (this.queue.isEmpty()) {
                handler.handle((Object) null);
            } else {
                this.completeHandler = handler;
            }
        }

        protected abstract boolean forward(T t);
    }

    public Subscriber<? super R> call(Subscriber<? super R> subscriber) {
        if (this.gate != null) {
            throw new IllegalStateException("Cannot have multiple subscriptions (use publish)");
        }
        this.gate = new Gate<>(subscriber);
        return new Subscriber<R>() { // from class: io.vertx.rxcore.java.impl.Regulator.1
            public void onCompleted() {
                Regulator.this.gate.onCompleted();
            }

            public void onError(Throwable th) {
                Regulator.this.gate.onError(th);
            }

            public void onNext(R r) {
                Regulator.this.gate.onNext(r);
            }
        };
    }

    public Observable<Long> stream(Observable<Buffer> observable, WriteStream<Buffer> writeStream) {
        final PublishSubject create = PublishSubject.create();
        final AtomicLong atomicLong = new AtomicLong();
        final BufferedWriteStream bufferedWriteStream = new BufferedWriteStream(writeStream);
        bufferedWriteStream.exceptionHandler(new Handler<Throwable>() { // from class: io.vertx.rxcore.java.impl.Regulator.2
            public void handle(Throwable th) {
                create.onError(th);
            }
        });
        observable.subscribe(new Action1<Buffer>() { // from class: io.vertx.rxcore.java.impl.Regulator.3
            public void call(Buffer buffer) {
                bufferedWriteStream.restrictHandler(Regulator.this.gate);
                bufferedWriteStream.m29write(buffer);
                atomicLong.addAndGet(buffer.length());
                create.onNext(Long.valueOf(atomicLong.get()));
            }
        }, new Action1<Throwable>() { // from class: io.vertx.rxcore.java.impl.Regulator.4
            public void call(Throwable th) {
                bufferedWriteStream.clear();
                create.onError(th);
            }
        }, new Action0() { // from class: io.vertx.rxcore.java.impl.Regulator.5
            public void call() {
                bufferedWriteStream.complete(new Handler<Void>() { // from class: io.vertx.rxcore.java.impl.Regulator.5.1
                    public void handle(Void r3) {
                        create.onCompleted();
                    }
                });
            }
        });
        return create;
    }
}
