/*
 * Decompiled with CFR 0.152.
 */
package ratpack.rx;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import ratpack.exec.ExecControl;
import ratpack.exec.ExecController;
import ratpack.exec.Fulfiller;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.func.Action;
import ratpack.rx.internal.DefaultSchedulers;
import ratpack.rx.internal.ExecControllerBackedScheduler;
import ratpack.util.Exceptions;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.OnErrorNotImplementedException;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

public abstract class RxRatpack {
    private RxRatpack() {
    }

    public static void initialize() {
        RxJavaPlugins plugins;
        block3: {
            plugins = RxJavaPlugins.getInstance();
            ExecutionHook ourHook = new ExecutionHook();
            try {
                plugins.registerObservableExecutionHook((RxJavaObservableExecutionHook)ourHook);
            }
            catch (IllegalStateException e) {
                RxJavaObservableExecutionHook existingHook = plugins.getObservableExecutionHook();
                if (existingHook instanceof ExecutionHook) break block3;
                throw new IllegalStateException("Cannot install RxJava integration because another execution hook (" + existingHook.getClass() + ") is already installed");
            }
        }
        System.setProperty("rxjava.plugin." + RxJavaSchedulersHook.class.getSimpleName() + ".implementation", DefaultSchedulers.class.getName());
        RxJavaSchedulersHook existingSchedulers = plugins.getSchedulersHook();
        if (!(existingSchedulers instanceof DefaultSchedulers)) {
            throw new IllegalStateException("Cannot install RxJava integration because another set of default schedulers (" + existingSchedulers.getClass() + ") is already installed");
        }
    }

    public static <T> Observable<T> observe(Promise<T> promise) {
        return Observable.create(subscriber -> promise.onError(arg_0 -> ((Subscriber)subscriber).onError(arg_0)).then(value -> {
            subscriber.onNext(value);
            subscriber.onCompleted();
        }));
    }

    public static <T, I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise) {
        return Observable.merge((Observable)RxRatpack.observe(promise).map(Observable::from));
    }

    public static <T> Promise<List<T>> promise(Observable<T> observable) throws UnmanagedThreadException {
        return ExecControl.current().promise(f -> observable.toList().subscribe(arg_0 -> ((Fulfiller)f).success(arg_0), arg_0 -> ((Fulfiller)f).error(arg_0)));
    }

    public static <T> Promise<T> promiseSingle(Observable<T> observable) throws UnmanagedThreadException {
        return ExecControl.current().promise(f -> observable.single().subscribe(arg_0 -> ((Fulfiller)f).success(arg_0), arg_0 -> ((Fulfiller)f).error(arg_0)));
    }

    public static <T> Observable<T> bindExec(Observable<T> source) {
        return (Observable)Exceptions.uncheck(() -> (Observable)RxRatpack.promise(source).to(RxRatpack::observeEach));
    }

    public static <T> Observable<T> forkEach(Observable<T> observable) {
        ExecControl current = ExecControl.current();
        return observable.lift(downstream -> new Subscriber<T>((Subscriber)downstream, (Subscriber)downstream, current){
            private final AtomicInteger wip;
            private final AtomicBoolean closed;
            final /* synthetic */ Subscriber val$downstream;
            final /* synthetic */ ExecControl val$cap$0;
            {
                this.val$downstream = subscriber;
                this.val$cap$0 = execControl;
                super(x0);
                this.wip = new AtomicInteger(1);
                this.closed = new AtomicBoolean();
            }

            public void onCompleted() {
                this.maybeDone();
            }

            public void onError(Throwable e) {
                this.terminate(() -> this.val$downstream.onError(e));
            }

            private void maybeDone() {
                if (this.wip.decrementAndGet() == 0) {
                    this.terminate(() -> ((Subscriber)this.val$downstream).onCompleted());
                }
            }

            private void terminate(Runnable runnable) {
                if (this.closed.compareAndSet(false, true)) {
                    runnable.run();
                }
            }

            public void onNext(T t) {
                if (this.isUnsubscribed() || this.closed.get()) {
                    return;
                }
                this.wip.incrementAndGet();
                this.val$cap$0.fork().onComplete(e -> this.maybeDone()).onError(this::onError).start(e -> {
                    if (!this.closed.get()) {
                        this.val$downstream.onNext(t);
                    }
                });
            }
        });
    }

    public static Scheduler scheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    private static class ExecutionBackedSubscriber<T>
    extends Subscriber<T> {
        private final Subscriber<? super T> subscriber;
        private final ExecControl execControl;

        public ExecutionBackedSubscriber(ExecControl execControl, Subscriber<? super T> subscriber) {
            super(subscriber);
            this.execControl = execControl;
            this.subscriber = subscriber;
        }

        public void onCompleted() {
            try {
                this.subscriber.onCompleted();
            }
            catch (OnErrorNotImplementedException e) {
                this.execControl.promise(f -> f.error(e.getCause())).then(Action.noop());
            }
        }

        public void onError(Throwable e) {
            try {
                this.subscriber.onError(e);
            }
            catch (OnErrorNotImplementedException e2) {
                this.execControl.promise(f -> f.error(e2.getCause())).then(Action.noop());
            }
        }

        public void onNext(T t) {
            try {
                this.subscriber.onNext(t);
            }
            catch (OnErrorNotImplementedException e) {
                this.execControl.promise(fulfiller -> fulfiller.error(e.getCause())).then(Action.noop());
            }
        }
    }

    private static class ExecutionHook
    extends RxJavaObservableExecutionHook {
        private ExecutionHook() {
        }

        public <T> Observable.OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, Observable.OnSubscribe<T> onSubscribe) {
            return ExecController.current().map(e -> this.executionBackedOnSubscribe(onSubscribe, (ExecController)e)).orElse(onSubscribe);
        }

        private <T> Observable.OnSubscribe<T> executionBackedOnSubscribe(Observable.OnSubscribe<T> onSubscribe, ExecController e) {
            return subscriber -> onSubscribe.call(new ExecutionBackedSubscriber(e.getControl(), subscriber));
        }
    }
}

