/*
 * Decompiled with CFR 0.152.
 */
package ratpack.rx;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import ratpack.exec.Downstream;
import ratpack.exec.ExecController;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.func.Action;
import ratpack.rx.internal.DefaultSchedulers;
import ratpack.rx.internal.ExecControllerBackedScheduler;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.OnErrorNotImplementedException;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

public abstract class RxRatpack {
    private RxRatpack() {
    }

    public static void initialize() {
        block5: {
            RxJavaPlugins plugins;
            block4: {
                plugins = RxJavaPlugins.getInstance();
                ExecutionHook ourHook = new ExecutionHook();
                try {
                    plugins.registerObservableExecutionHook((RxJavaObservableExecutionHook)ourHook);
                }
                catch (IllegalStateException e) {
                    RxJavaObservableExecutionHook existingHook = plugins.getObservableExecutionHook();
                    if (existingHook instanceof ExecutionHook) break block4;
                    throw new IllegalStateException("Cannot install RxJava integration because another execution hook (" + existingHook.getClass() + ") is already installed");
                }
            }
            try {
                plugins.registerSchedulersHook((RxJavaSchedulersHook)new DefaultSchedulers());
            }
            catch (IllegalStateException e) {
                RxJavaSchedulersHook existingSchedulers = plugins.getSchedulersHook();
                if (existingSchedulers instanceof DefaultSchedulers) break block5;
                throw new IllegalStateException("Cannot install RxJava integration because another set of default schedulers (" + existingSchedulers.getClass() + ") is already installed");
            }
        }
    }

    public static <T> Observable<T> observe(Promise<T> promise) {
        return Observable.create(subscriber -> promise.onError(arg_0 -> ((Subscriber)subscriber).onError(arg_0)).then(value -> {
            subscriber.onNext(value);
            subscriber.onCompleted();
        }));
    }

    public static Observable<Void> observe(Operation operation) {
        return Observable.create(subscriber -> operation.onError(arg_0 -> ((Subscriber)subscriber).onError(arg_0)).then(() -> ((Subscriber)subscriber).onCompleted()));
    }

    public static <T, I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise) {
        return Observable.merge((Observable)RxRatpack.observe(promise).map(Observable::from));
    }

    public static <T> Promise<List<T>> promise(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.of(f -> observable.toList().subscribe(arg_0 -> ((Downstream)f).success(arg_0), arg_0 -> ((Downstream)f).error(arg_0)));
    }

    public static <T> Promise<List<T>> promise(Observable.OnSubscribe<T> onSubscribe) throws UnmanagedThreadException {
        return RxRatpack.promise(Observable.create(onSubscribe));
    }

    public static <T> Promise<T> promiseSingle(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.of(f -> observable.single().subscribe(arg_0 -> ((Downstream)f).success(arg_0), arg_0 -> ((Downstream)f).error(arg_0)));
    }

    public static <T> Promise<T> promiseSingle(Observable.OnSubscribe<T> onSubscribe) throws UnmanagedThreadException {
        return RxRatpack.promiseSingle(Observable.create(onSubscribe));
    }

    public static <T> TransformablePublisher<T> publisher(Observable<T> observable) {
        return Streams.transformable((Publisher)RxReactiveStreams.toPublisher(observable));
    }

    public static <T> TransformablePublisher<T> publisher(Observable.OnSubscribe<T> onSubscribe) {
        return RxRatpack.publisher(Observable.create(onSubscribe));
    }

    public static <T> Observable<T> bindExec(Observable<T> source) {
        return (Observable)Exceptions.uncheck(() -> (Observable)RxRatpack.promise(source).to(RxRatpack::observeEach));
    }

    public static <T> Observable<T> forkEach(Observable<T> observable) {
        return observable.lift(downstream -> new Subscriber<T>((Subscriber)downstream, (Subscriber)downstream){
            private final AtomicInteger wip;
            private final AtomicBoolean closed;
            final /* synthetic */ Subscriber val$downstream;
            {
                this.val$downstream = subscriber;
                super(x0);
                this.wip = new AtomicInteger(1);
                this.closed = new AtomicBoolean();
            }

            public void onCompleted() {
                this.maybeDone();
            }

            public void onError(Throwable e) {
                this.terminate(() -> this.val$downstream.onError(e));
            }

            private void maybeDone() {
                if (this.wip.decrementAndGet() == 0) {
                    this.terminate(() -> ((Subscriber)this.val$downstream).onCompleted());
                }
            }

            private void terminate(Runnable runnable) {
                if (this.closed.compareAndSet(false, true)) {
                    runnable.run();
                }
            }

            public void onNext(T t) {
                if (this.isUnsubscribed() || this.closed.get()) {
                    return;
                }
                this.wip.incrementAndGet();
                Execution.fork().onComplete(e -> this.maybeDone()).onError(this::onError).start(e -> {
                    if (!this.closed.get()) {
                        this.val$downstream.onNext(t);
                    }
                });
            }
        });
    }

    public static Scheduler scheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    public static Scheduler scheduler() {
        return RxRatpack.scheduler(ExecController.require());
    }

    private static class ExecutionBackedSubscriber<T>
    extends Subscriber<T> {
        private final Subscriber<? super T> subscriber;

        public ExecutionBackedSubscriber(Subscriber<? super T> subscriber) {
            super(subscriber);
            this.subscriber = subscriber;
        }

        public void onCompleted() {
            try {
                this.subscriber.onCompleted();
            }
            catch (OnErrorNotImplementedException e) {
                Promise.error((Throwable)e.getCause()).then(Action.noop());
            }
        }

        public void onError(Throwable e) {
            try {
                this.subscriber.onError(e);
            }
            catch (OnErrorNotImplementedException e2) {
                Promise.error((Throwable)e2.getCause()).then(Action.noop());
            }
        }

        public void onNext(T t) {
            try {
                this.subscriber.onNext(t);
            }
            catch (OnErrorNotImplementedException e) {
                Promise.error((Throwable)e.getCause()).then(Action.noop());
            }
        }
    }

    private static class ExecutionHook
    extends RxJavaObservableExecutionHook {
        private ExecutionHook() {
        }

        public <T> Observable.OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, Observable.OnSubscribe<T> onSubscribe) {
            return ExecController.current().map(e -> this.executionBackedOnSubscribe(onSubscribe)).orElse(onSubscribe);
        }

        private <T> Observable.OnSubscribe<T> executionBackedOnSubscribe(Observable.OnSubscribe<T> onSubscribe) {
            return subscriber -> onSubscribe.call(new ExecutionBackedSubscriber(subscriber));
        }
    }
}

