/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.BackPressureOverflowException;
import io.helidon.common.reactive.BaseProcessor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;

public abstract class BufferedProcessor<T, U>
extends BaseProcessor<T, U> {
    private static final int BACK_PRESSURE_BUFFER_SIZE = 1024;
    private BlockingQueue<U> buffer = new ArrayBlockingQueue<U>(1024);

    @Override
    protected void tryRequest(Flow.Subscription subscription) {
        if (!this.getSubscriber().isClosed() && !this.buffer.isEmpty()) {
            try {
                this.submit(this.buffer.take());
            }
            catch (InterruptedException e) {
                this.failAndCancel(e);
            }
        } else {
            super.tryRequest(subscription);
        }
    }

    @Override
    protected void notEnoughRequest(U item) {
        if (!this.buffer.offer(item)) {
            this.fail(new BackPressureOverflowException(1024));
        }
    }

    @Override
    public void onComplete() {
        if (this.buffer.isEmpty()) {
            super.onComplete();
        }
    }
}

