/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.StreamValidationUtils;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;

public class ConcatPublisher<T>
implements Flow.Publisher<T>,
Multi<T> {
    private FirstProcessor firstProcessor;
    private SecondProcessor secondProcessor;
    private Flow.Subscriber<T> subscriber;
    private Flow.Publisher<T> firstPublisher;
    private Flow.Publisher<T> secondPublisher;
    private AtomicLong requested = new AtomicLong();

    private ConcatPublisher(Flow.Publisher<T> firstPublisher, Flow.Publisher<T> secondPublisher) {
        this.firstPublisher = firstPublisher;
        this.secondPublisher = secondPublisher;
    }

    public static <T> ConcatPublisher<T> create(Flow.Publisher<T> firstPublisher, Flow.Publisher<T> secondPublisher) {
        return new ConcatPublisher<T>(firstPublisher, secondPublisher);
    }

    @Override
    public void subscribe(final Flow.Subscriber<? super T> subscriber) {
        this.subscriber = subscriber;
        this.firstProcessor = new FirstProcessor();
        this.secondProcessor = new SecondProcessor();
        this.firstPublisher.subscribe(this.firstProcessor);
        subscriber.onSubscribe(new Flow.Subscription(){

            @Override
            public void request(long n) {
                if (!StreamValidationUtils.checkRequestParam(n, subscriber::onError)) {
                    return;
                }
                ConcatPublisher.this.requested.set(n);
                if (!ConcatPublisher.this.firstProcessor.complete) {
                    ConcatPublisher.this.firstProcessor.subscription.request(n);
                } else {
                    ConcatPublisher.this.secondProcessor.subscription.request(n);
                }
            }

            @Override
            public void cancel() {
                ConcatPublisher.this.firstProcessor.subscription.cancel();
                ConcatPublisher.this.secondProcessor.subscription.cancel();
            }
        });
    }

    private class SecondProcessor
    implements Flow.Processor<Object, Object> {
        private Flow.Subscription subscription;

        private SecondProcessor() {
        }

        @Override
        public void subscribe(Flow.Subscriber<? super Object> s) {
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            Objects.requireNonNull(subscription);
            this.subscription = subscription;
        }

        @Override
        public void onNext(Object o) {
            ConcatPublisher.this.subscriber.onNext(o);
        }

        @Override
        public void onError(Throwable t) {
            ConcatPublisher.this.firstProcessor.subscription.cancel();
            this.subscription.cancel();
            ConcatPublisher.this.subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            ConcatPublisher.this.subscriber.onComplete();
        }
    }

    private class FirstProcessor
    implements Flow.Processor<Object, Object> {
        private Flow.Subscription subscription;
        private boolean complete = false;

        private FirstProcessor() {
        }

        @Override
        public void subscribe(Flow.Subscriber<? super Object> s) {
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            Objects.requireNonNull(subscription);
            this.subscription = subscription;
            ConcatPublisher.this.secondPublisher.subscribe(ConcatPublisher.this.secondProcessor);
        }

        @Override
        public void onNext(Object o) {
            ConcatPublisher.this.requested.decrementAndGet();
            ConcatPublisher.this.subscriber.onNext(o);
        }

        @Override
        public void onError(Throwable t) {
            this.complete = true;
            Optional.ofNullable(ConcatPublisher.this.secondProcessor.subscription).ifPresent(Flow.Subscription::cancel);
            this.subscription.cancel();
            ConcatPublisher.this.subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            this.complete = true;
            Optional.ofNullable(ConcatPublisher.this.secondProcessor.subscription).ifPresent(s -> s.request(ConcatPublisher.this.requested.get()));
        }
    }
}

