/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.BufferedProcessor;
import io.helidon.common.reactive.Multi;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;

public class MultiLimitProcessor<T>
extends BufferedProcessor<T, T>
implements Multi<T> {
    private final AtomicLong counter;

    private MultiLimitProcessor(Long limit) {
        this.counter = new AtomicLong(limit);
    }

    public static <T> MultiLimitProcessor<T> create(Long limit) {
        return new MultiLimitProcessor<T>(limit);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> s) {
        super.subscribe((Flow.Subscriber)s);
        if (this.counter.get() == 0L) {
            this.tryComplete();
        }
    }

    @Override
    public void onError(Throwable ex) {
        if (0L < this.counter.get()) {
            super.onError(ex);
        } else {
            this.tryComplete();
        }
    }

    @Override
    protected void hookOnNext(T item) {
        long actCounter = this.counter.getAndDecrement();
        if (0L < actCounter) {
            this.submit(item);
        } else {
            this.getSubscription().ifPresent(Flow.Subscription::cancel);
            this.tryComplete();
        }
    }

    public String toString() {
        return "LimitProcessor{counter=" + this.counter + "}";
    }
}

