package io.activej.csp.queue;

import io.activej.async.exception.AsyncCloseException;
import io.activej.common.recycle.Recyclers;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/csp/queue/ChannelBufferWithFallback.class */
public final class ChannelBufferWithFallback<T> implements ChannelQueue<T> {
    private final ChannelQueue<T> queue;
    private final Supplier<Promise<? extends ChannelQueue<T>>> bufferFactory;

    @Nullable
    private ChannelQueue<T> buffer;

    @Nullable
    private Exception exception;
    private SettablePromise<Void> waitingForBuffer;
    private boolean finished = false;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ChannelBufferWithFallback(ChannelQueue<T> channelQueue, Supplier<Promise<? extends ChannelQueue<T>>> supplier) {
        this.queue = channelQueue;
        this.bufferFactory = supplier;
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public Promise<Void> put(@Nullable T t) {
        if (this.exception == null) {
            return doPut(t);
        }
        Recyclers.recycle(t);
        return Promise.ofException(this.exception);
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public Promise<T> take() {
        return this.exception != null ? Promise.ofException(this.exception) : doTake();
    }

    private Promise<Void> doPut(@Nullable T t) {
        if (t == null) {
            this.finished = true;
        }
        if (this.buffer != null) {
            return secondaryPut(t);
        }
        if (this.waitingForBuffer != null) {
            return this.waitingForBuffer.then(r5 -> {
                return secondaryPut(t);
            });
        }
        if (!this.queue.isSaturated()) {
            return this.queue.put(t);
        }
        SettablePromise<Void> settablePromise = new SettablePromise<>();
        this.waitingForBuffer = settablePromise;
        return this.bufferFactory.get().then(channelQueue -> {
            this.buffer = channelQueue;
            settablePromise.set((Object) null);
            this.waitingForBuffer = null;
            return secondaryPut(t);
        });
    }

    public Promise<T> doTake() {
        return this.buffer != null ? secondaryTake() : this.waitingForBuffer != null ? this.waitingForBuffer.then(r3 -> {
            return secondaryTake();
        }) : (this.finished && this.queue.isExhausted()) ? Promise.of((Object) null) : this.queue.take();
    }

    private Promise<Void> secondaryPut(@Nullable T t) {
        if ($assertionsDisabled || this.buffer != null) {
            return this.buffer.put(t).then((r5, exc) -> {
                if (exc == null) {
                    return Promise.complete();
                }
                if (!(exc instanceof AsyncCloseException)) {
                    return Promise.ofException(exc);
                }
                this.buffer = null;
                return doPut(t);
            });
        }
        throw new AssertionError();
    }

    private Promise<T> secondaryTake() {
        return this.buffer == null ? doTake() : this.buffer.take().then((obj, exc) -> {
            if (exc != null) {
                if (!(exc instanceof AsyncCloseException)) {
                    return Promise.ofException(exc);
                }
            } else {
                if (obj != null) {
                    return Promise.of(obj);
                }
                this.buffer.close();
            }
            this.buffer = null;
            return doTake();
        });
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public boolean isSaturated() {
        return this.queue.isSaturated() && this.buffer != null && this.buffer.isSaturated();
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public boolean isExhausted() {
        return this.queue.isExhausted() && (this.buffer == null || this.buffer.isExhausted());
    }

    public void closeEx(@NotNull Exception exc) {
        if (this.exception != null) {
            return;
        }
        this.exception = exc;
        this.queue.closeEx(exc);
        if (this.waitingForBuffer != null) {
            this.waitingForBuffer.whenResult(() -> {
                if (!$assertionsDisabled && this.buffer == null) {
                    throw new AssertionError();
                }
                this.buffer.closeEx(exc);
            });
        }
        if (this.buffer != null) {
            this.buffer.closeEx(exc);
        }
    }

    @Nullable
    public Exception getException() {
        return this.exception;
    }

    static {
        $assertionsDisabled = !ChannelBufferWithFallback.class.desiredAssertionStatus();
    }
}
