/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.BufferedProcessor;
import io.helidon.common.reactive.Multi;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public class MultiOnErrorResumeProcessor<T>
extends BufferedProcessor<T, T>
implements Multi<T> {
    private Function<Throwable, T> supplier;
    private Function<Throwable, Flow.Publisher<T>> publisherSupplier;
    private AtomicReference<Optional<Flow.Subscription>> onErrorPublisherSubscription = new AtomicReference(Optional.empty());

    private MultiOnErrorResumeProcessor() {
    }

    public static <T> MultiOnErrorResumeProcessor<T> resume(Function<Throwable, ?> supplier) {
        MultiOnErrorResumeProcessor<T> processor = new MultiOnErrorResumeProcessor<T>();
        processor.supplier = supplier;
        return processor;
    }

    public static <T> MultiOnErrorResumeProcessor<T> resumeWith(Function<Throwable, Flow.Publisher<T>> supplier) {
        MultiOnErrorResumeProcessor<T> processor = new MultiOnErrorResumeProcessor<T>();
        processor.publisherSupplier = supplier;
        return processor;
    }

    @Override
    protected void tryRequest(Flow.Subscription subscription) {
        super.tryRequest(this.onErrorPublisherSubscription.get().orElse(subscription));
    }

    @Override
    protected void hookOnNext(T item) {
        super.submit(item);
    }

    @Override
    public void onError(Throwable ex) {
        Objects.requireNonNull(ex);
        try {
            if (Objects.nonNull(this.supplier)) {
                this.submit(this.supplier.apply(ex));
                this.tryComplete();
            } else {
                this.publisherSupplier.apply(ex).subscribe(new Flow.Subscriber<T>(){

                    @Override
                    public void onSubscribe(Flow.Subscription subscription) {
                        Objects.requireNonNull(subscription);
                        MultiOnErrorResumeProcessor.this.onErrorPublisherSubscription.set(Optional.of(subscription));
                        if (MultiOnErrorResumeProcessor.this.getRequestedCounter().get() > 0L) {
                            subscription.request(MultiOnErrorResumeProcessor.this.getRequestedCounter().get());
                        }
                    }

                    @Override
                    public void onNext(T t) {
                        MultiOnErrorResumeProcessor.this.submit(t);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Objects.requireNonNull(t);
                        MultiOnErrorResumeProcessor.this.fail(t);
                    }

                    @Override
                    public void onComplete() {
                        MultiOnErrorResumeProcessor.this.onComplete();
                        MultiOnErrorResumeProcessor.this.onErrorPublisherSubscription.set(Optional.empty());
                    }
                });
            }
        }
        catch (Throwable t) {
            this.onErrorPublisherSubscription.get().ifPresent(Flow.Subscription::cancel);
            this.fail(t);
        }
    }

    @Override
    protected void hookOnCancel(Flow.Subscription subscription) {
        subscription.cancel();
        this.onErrorPublisherSubscription.get().ifPresent(Flow.Subscription::cancel);
    }
}

