/*
 * Decompiled with CFR 0.152.
 */
package ratpack.reactor;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import ratpack.exec.Downstream;
import ratpack.exec.ExecController;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.func.Action;
import ratpack.reactor.internal.BlockingExecutorBackedScheduler;
import ratpack.reactor.internal.DefaultSchedulers;
import ratpack.reactor.internal.ErrorHandler;
import ratpack.reactor.internal.ExecControllerBackedScheduler;
import ratpack.registry.RegistrySpec;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;

public abstract class ReactorRatpack {
    private ReactorRatpack() {
    }

    public static void initialize() {
        Hooks.onOperatorError((BiFunction)new ErrorHandler());
    }

    public static Flux<Void> flux(Operation operation) {
        return Flux.create(sink -> operation.onError(arg_0 -> ((FluxSink)sink).error(arg_0)).then(() -> ((FluxSink)sink).complete()));
    }

    public static <T> Flux<T> flux(Promise<T> promise) {
        return Flux.create(subscriber -> promise.onError(arg_0 -> ((FluxSink)subscriber).error(arg_0)).then(value -> {
            subscriber.next(value);
            subscriber.complete();
        }));
    }

    public static <T, I extends Iterable<T>> Flux<T> fluxEach(Promise<I> promise) {
        return Flux.merge((Publisher)ReactorRatpack.flux(promise).map(Flux::fromIterable));
    }

    public static <T> Mono<T> mono(Promise<T> promise) {
        return Mono.create(subscriber -> promise.onError(arg_0 -> ((MonoSink)subscriber).error(arg_0)).then(arg_0 -> ((MonoSink)subscriber).success(arg_0)));
    }

    public static <T> Promise<List<T>> promise(Flux<T> flux) throws UnmanagedThreadException {
        return Promise.async(f -> flux.collectList().subscribe(arg_0 -> ((Downstream)f).success(arg_0), arg_0 -> ((Downstream)f).error(arg_0)));
    }

    public static <T> Promise<T> promiseSingle(Mono<T> mono) throws UnmanagedThreadException {
        return Promise.async(f -> mono.subscribe(arg_0 -> ((Downstream)f).success(arg_0), arg_0 -> ((Downstream)f).error(arg_0)));
    }

    public static <T> TransformablePublisher<T> publisher(Flux<T> flux) {
        return Streams.transformable(flux);
    }

    public static <T> Flux<T> bindExec(Flux<T> source) {
        return (Flux)Exceptions.uncheck(() -> (Flux)ReactorRatpack.promise(source).to(ReactorRatpack::fluxEach));
    }

    public static <T> Flux<T> fork(Flux<T> observable) {
        return ReactorRatpack.fluxEach(ReactorRatpack.promise(observable).fork());
    }

    public static <T> Flux<T> fork(Flux<T> flux, Action<? super RegistrySpec> doWithRegistrySpec) throws Exception {
        return ReactorRatpack.fluxEach(ReactorRatpack.promise(flux).fork(execSpec -> execSpec.register(doWithRegistrySpec)));
    }

    public static <T> Flux<T> forkEach(Flux<T> flux) {
        return ReactorRatpack.forkEach(flux, (Action<? super RegistrySpec>)Action.noop());
    }

    public static <T> Flux<T> forkEach(Flux<T> flux, Action<? super RegistrySpec> doWithRegistrySpec) {
        return flux.transform(Operators.lift((scannable, subscriber) -> new CoreSubscriber<T>((CoreSubscriber)subscriber, (Action)doWithRegistrySpec){
            private final AtomicInteger wip = new AtomicInteger(1);
            private final AtomicBoolean closed = new AtomicBoolean();
            private Subscription subscription;
            final /* synthetic */ CoreSubscriber val$subscriber;
            final /* synthetic */ Action val$doWithRegistrySpec;
            {
                this.val$subscriber = coreSubscriber;
                this.val$doWithRegistrySpec = action;
            }

            public void onSubscribe(Subscription s) {
                this.subscription = s;
                s.request(1L);
                this.val$subscriber.onSubscribe(s);
            }

            public void onComplete() {
                this.maybeDone();
            }

            public void onError(Throwable e) {
                this.terminate(() -> this.val$subscriber.onError(e));
            }

            private void maybeDone() {
                if (this.wip.decrementAndGet() == 0) {
                    this.terminate(() -> ((CoreSubscriber)this.val$subscriber).onComplete());
                }
            }

            private void terminate(Runnable runnable) {
                if (this.closed.compareAndSet(false, true)) {
                    this.subscription.cancel();
                    runnable.run();
                }
            }

            public void onNext(T t) {
                if (this.closed.get()) {
                    return;
                }
                this.wip.incrementAndGet();
                Execution.fork().register(this.val$doWithRegistrySpec).onComplete(e -> this.maybeDone()).onError(this::onError).start(e -> {
                    if (!this.closed.get()) {
                        this.subscription.request(1L);
                        this.val$subscriber.onNext(t);
                    }
                });
            }
        }));
    }

    public static Scheduler computationScheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    public static Scheduler computationScheduler() {
        return DefaultSchedulers.getComputationScheduler();
    }

    public static Scheduler ioScheduler(ExecController execController) {
        return new BlockingExecutorBackedScheduler(execController);
    }

    public static Scheduler ioScheduler() {
        return DefaultSchedulers.getIoScheduler();
    }
}

