package org.ujorm.tools.thread;

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.ujorm.tools.Assert;

/* loaded from: input_file:org/ujorm/tools/thread/AsyncStreamBuilder.class */
public class AsyncStreamBuilder<T> {
    private static final Object UNDEFINED = new Object();
    private final AtomicLong countDown;
    private final Duration timeout;
    private final LinkedBlockingQueue<T> queue;
    private final Stream<T> stream;
    private final Clock clock;
    private long startMilis;
    private volatile boolean closed;
    private volatile Throwable interrupt;

    public AsyncStreamBuilder(long j) {
        this(j, Duration.ofMinutes(1L));
    }

    public AsyncStreamBuilder(long j, @Nonnull Duration duration) {
        this.startMilis = Long.MIN_VALUE;
        Assert.isTrue(j >= 0, "count");
        Assert.notNull(duration, "timeout");
        this.countDown = new AtomicLong(j);
        this.timeout = duration;
        this.queue = new LinkedBlockingQueue<>();
        this.clock = Clock.systemUTC();
        this.stream = Stream.generate(() -> {
            return get();
        }).limit(j).filter(obj -> {
            return obj != UNDEFINED;
        });
    }

    @Nonnull
    protected T get() throws JobException {
        if (this.interrupt == null) {
            try {
                long millis = isOpen() ? (this.timeout.toMillis() - this.clock.millis()) + this.startMilis : 0L;
                T poll = millis > 0 ? this.queue.poll(millis, TimeUnit.MILLISECONDS) : this.queue.poll();
                if (poll != null) {
                    return poll;
                }
                close();
                throw JobException.of("Time is over: " + this.timeout);
            } catch (InterruptedException e) {
                if (this.interrupt == null) {
                    this.interrupt = e;
                }
                close();
                Thread.currentThread().interrupt();
            }
        }
        throw JobException.of(this.interrupt);
    }

    @Nonnull
    public Stream<T> stream() {
        if (this.startMilis == Long.MIN_VALUE) {
            this.startMilis = this.clock.millis();
        }
        return this.stream;
    }

    public void addAll(@Nonnull T... tArr) {
        for (T t : tArr) {
            add(t);
        }
    }

    public void add(@Nullable T t) {
        if (this.countDown.decrementAndGet() < 0) {
            throw JobException.of("The parameter is over limit: " + t);
        }
        if (this.interrupt == null) {
            if (!isOpen()) {
                throw JobException.of("The builder is closed");
            }
            this.queue.add(t != null ? t : UNDEFINED);
        }
    }

    protected final void close() {
        this.closed = true;
    }

    protected final boolean isOpen() {
        return !this.closed;
    }

    public void interrupt(@Nonnull Throwable th) {
        if (this.interrupt == null) {
            this.interrupt = (Throwable) Assert.notNull(th, "causedBy");
            close();
            Thread.currentThread().interrupt();
        }
    }
}
