/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.RequestedCounter;
import io.helidon.common.reactive.SubscriberReference;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class MultiFlatMapProcessor<T>
implements Flow.Processor<T, T>,
Multi<T> {
    private static final int DEFAULT_BUFFER_SIZE = 64;
    private Function<T, Flow.Publisher<T>> mapper;
    private SubscriberReference<? super T> subscriber;
    private Flow.Subscription subscription;
    private RequestedCounter requestCounter = new RequestedCounter();
    private Flow.Subscription innerSubscription;
    private AtomicBoolean onCompleteReceivedAlready = new AtomicBoolean(false);
    private PublisherBuffer<T> buffer;
    private Optional<Throwable> error = Optional.empty();

    private MultiFlatMapProcessor() {
        this.buffer = new PublisherBuffer();
    }

    public static <T> MultiFlatMapProcessor<T> fromIterableMapper(Function<T, Iterable<T>> mapper) {
        MultiFlatMapProcessor<T> flatMapProcessor = new MultiFlatMapProcessor<T>();
        flatMapProcessor.mapper = o -> Multi.from((Iterable)mapper.apply(o));
        return flatMapProcessor;
    }

    public static <T> MultiFlatMapProcessor<T> fromPublisherMapper(Function<?, Flow.Publisher<T>> mapper) {
        Function publisherMapper = mapper;
        MultiFlatMapProcessor<T> flatMapProcessor = new MultiFlatMapProcessor<T>();
        flatMapProcessor.mapper = t -> (Flow.Publisher)publisherMapper.apply(t);
        return flatMapProcessor;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        this.subscriber = SubscriberReference.create(subscriber);
        if (Objects.nonNull(this.subscription)) {
            subscriber.onSubscribe(new FlatMapSubscription());
        }
        this.error.ifPresent(subscriber::onError);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        if (Objects.nonNull(this.subscription)) {
            subscription.cancel();
            return;
        }
        this.subscription = subscription;
        if (Objects.nonNull(this.subscriber)) {
            this.subscriber.onSubscribe(new FlatMapSubscription());
        }
    }

    @Override
    public void onNext(T o) {
        Objects.requireNonNull(o);
        try {
            this.buffer.offer(o);
        }
        catch (Throwable t) {
            this.onError(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        this.error = Optional.of(t);
        if (Objects.nonNull(this.subscriber)) {
            this.subscriber.onError(t);
        }
    }

    @Override
    public void onComplete() {
        this.onCompleteReceivedAlready.set(true);
        if (this.buffer.isComplete()) {
            this.subscriber.onComplete();
        }
    }

    private class InnerSubscriber<R>
    implements Flow.Subscriber<R> {
        private AtomicBoolean subscriptionAcked = new AtomicBoolean(false);
        private AtomicBoolean done = new AtomicBoolean(false);
        private Optional<Runnable> whenCompleteObserver = Optional.empty();

        private InnerSubscriber() {
        }

        @Override
        public void onSubscribe(Flow.Subscription innerSubscription) {
            Objects.requireNonNull(innerSubscription);
            if (this.subscriptionAcked.get()) {
                innerSubscription.cancel();
                return;
            }
            this.subscriptionAcked.set(true);
            MultiFlatMapProcessor.this.innerSubscription = innerSubscription;
            innerSubscription.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(R o) {
            Objects.requireNonNull(o);
            MultiFlatMapProcessor.this.subscriber.onNext(o);
            MultiFlatMapProcessor.this.requestCounter.tryDecrement();
        }

        @Override
        public void onError(Throwable t) {
            Objects.requireNonNull(t);
            MultiFlatMapProcessor.this.subscription.cancel();
            MultiFlatMapProcessor.this.onError(t);
        }

        @Override
        public void onComplete() {
            this.done.set(true);
            this.whenCompleteObserver.ifPresent(Runnable::run);
            long requestCount = MultiFlatMapProcessor.this.requestCounter.get();
            if (requestCount > 0L) {
                MultiFlatMapProcessor.this.subscription.request(requestCount);
            }
        }

        private void whenComplete(Runnable whenCompleteObserver) {
            this.whenCompleteObserver = Optional.of(whenCompleteObserver);
        }

        private boolean isDone() {
            return this.done.get();
        }
    }

    private class PublisherBuffer<U> {
        private int bufferSize = Integer.parseInt(System.getProperty("helidon.common.reactive.flatMap.buffer.size", String.valueOf(64)));
        private BlockingQueue<U> buffer = new ArrayBlockingQueue<U>(this.bufferSize);
        private InnerSubscriber<? super T> lastSubscriber = null;

        private PublisherBuffer() {
        }

        public boolean isComplete() {
            return Objects.isNull(this.lastSubscriber) || this.lastSubscriber.isDone() && this.buffer.isEmpty();
        }

        public void tryNext() {
            Object nextItem = this.buffer.poll();
            if (Objects.nonNull(nextItem)) {
                this.lastSubscriber = this.executeMapper(nextItem);
            } else if (MultiFlatMapProcessor.this.onCompleteReceivedAlready.get()) {
                MultiFlatMapProcessor.this.subscriber.onComplete();
            }
        }

        public void offer(U o) {
            if (this.buffer.isEmpty() && (Objects.isNull(this.lastSubscriber) || this.lastSubscriber.isDone())) {
                this.lastSubscriber = this.executeMapper(o);
            } else {
                this.buffer.add(o);
            }
        }

        public InnerSubscriber<? super T> executeMapper(U item) {
            InnerSubscriber innerSubscriber = null;
            try {
                innerSubscriber = new InnerSubscriber();
                innerSubscriber.whenComplete(this::tryNext);
                MultiFlatMapProcessor.this.mapper.apply(item).subscribe(innerSubscriber);
            }
            catch (Throwable t) {
                MultiFlatMapProcessor.this.subscription.cancel();
                MultiFlatMapProcessor.this.subscriber.onError(t);
            }
            return innerSubscriber;
        }
    }

    private class FlatMapSubscription
    implements Flow.Subscription {
        private FlatMapSubscription() {
        }

        @Override
        public void request(long n) {
            if (MultiFlatMapProcessor.this.buffer.isComplete() || Objects.isNull(MultiFlatMapProcessor.this.innerSubscription)) {
                MultiFlatMapProcessor.this.subscription.request(n);
            } else {
                MultiFlatMapProcessor.this.requestCounter.increment(n, MultiFlatMapProcessor.this::onError);
                MultiFlatMapProcessor.this.innerSubscription.request(n);
            }
        }

        @Override
        public void cancel() {
            MultiFlatMapProcessor.this.subscription.cancel();
            Optional.ofNullable(MultiFlatMapProcessor.this.innerSubscription).ifPresent(Flow.Subscription::cancel);
            MultiFlatMapProcessor.this.subscriber.releaseReference();
        }
    }
}

