package stream.io;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;

/* loaded from: input_file:stream/io/BlockingQueue.class */
public class BlockingQueue extends AbstractQueue {
    private static final Logger log = LoggerFactory.getLogger(BlockingQueue.class);
    private transient Node<Data> head;
    private transient Node<Data> last;
    protected AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicInteger count = new AtomicInteger(0);
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = this.takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = this.putLock.newCondition();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:stream/io/BlockingQueue$Node.class */
    public static class Node<E> {
        E item;
        Node<E> next;

        Node(E e) {
            this.item = e;
        }
    }

    public BlockingQueue() {
        Node<Data> node = new Node<>(null);
        this.head = node;
        this.last = node;
    }

    public BlockingQueue(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        this.capacity = i;
    }

    private void signalNotEmpty() {
        ReentrantLock reentrantLock = this.takeLock;
        reentrantLock.lock();
        try {
            this.notEmpty.signal();
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    private void signalNotFull() {
        ReentrantLock reentrantLock = this.putLock;
        reentrantLock.lock();
        try {
            this.notFull.signal();
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void enqueue(Node<Data> node) {
        this.last.next = node;
        this.last = node;
    }

    private Data dequeue() {
        Node node = this.head;
        Node node2 = node.next;
        node.next = node;
        this.head = node2;
        Data data = (Data) node2.item;
        node2.item = null;
        return data;
    }

    void fullyLock() {
        this.putLock.lock();
        this.takeLock.lock();
    }

    void fullyUnlock() {
        this.takeLock.unlock();
        this.putLock.unlock();
    }

    public int size() {
        return this.count.get();
    }

    public int remainingCapacity() {
        return this.capacity - this.count.get();
    }

    @Override // stream.io.QueueService
    public boolean enqueue(Data data) {
        log.trace("Queue {}: Enqueuing event {}", getId(), data);
        if (data == null) {
            return false;
        }
        try {
            if (this.closed.get()) {
                return false;
            }
            Node<Data> node = new Node<>(data);
            ReentrantLock reentrantLock = this.putLock;
            AtomicInteger atomicInteger = this.count;
            reentrantLock.lockInterruptibly();
            while (atomicInteger.get() == this.capacity) {
                try {
                    this.notFull.await();
                } finally {
                    reentrantLock.unlock();
                }
            }
            if (this.closed.get()) {
                return false;
            }
            enqueue(node);
            int andIncrement = atomicInteger.getAndIncrement();
            log.debug("put size: {}", Integer.valueOf(andIncrement));
            if (andIncrement + 1 < this.capacity) {
                this.notFull.signal();
            }
            reentrantLock.unlock();
            if (andIncrement != 0) {
                return true;
            }
            signalNotEmpty();
            return true;
        } catch (Exception e) {
            log.error("Error enqueuing item: {}", e.getMessage());
            if (!log.isDebugEnabled()) {
                return false;
            }
            e.printStackTrace();
            return false;
        }
    }

    @Override // stream.io.Sink
    public void init() throws Exception {
        if (getCapacity().intValue() < 1) {
            throw new IllegalArgumentException("Invalid queue-capacity '" + getCapacity() + "'!");
        }
    }

    @Override // stream.io.Sink
    public void close() throws Exception {
        log.debug("Closing queue '{}'...", getId());
        fullyLock();
        try {
            if (this.closed.get()) {
                log.debug("Queue '{}' already closed.", getId());
                fullyUnlock();
            } else {
                this.closed.getAndSet(true);
                fullyUnlock();
            }
        } catch (Throwable th) {
            fullyUnlock();
            throw th;
        }
    }

    @Override // stream.io.Source
    public Data read() throws Exception {
        log.trace("Reading from queue {}", getId());
        Data data = null;
        int i = -1;
        AtomicInteger atomicInteger = this.count;
        ReentrantLock reentrantLock = this.takeLock;
        reentrantLock.lockInterruptibly();
        try {
            try {
            } catch (InterruptedException e) {
                if (this.closed.get() && atomicInteger.get() == 0) {
                    log.debug("Queue '{}' is closed and empty => null", getId());
                    reentrantLock.unlock();
                    return null;
                }
                log.error("Interruped while waiting for data: {}", e.getMessage());
                if (log.isDebugEnabled()) {
                    e.printStackTrace();
                }
                reentrantLock.unlock();
            }
            if (this.closed.get() && atomicInteger.get() == 0) {
                log.debug("Queue '{}' is closed and empty => null", getId());
                reentrantLock.unlock();
                return null;
            }
            while (atomicInteger.get() == 0) {
                this.notEmpty.await();
            }
            data = dequeue();
            i = atomicInteger.getAndDecrement();
            log.debug("take size: {}", Integer.valueOf(i));
            log.trace("took item from queue: {}", data);
            if (i > 1) {
                this.notEmpty.signal();
            }
            reentrantLock.unlock();
            if (i == this.capacity) {
                signalNotFull();
            }
            return data;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // stream.io.QueueService
    public Data poll() {
        throw new IllegalAccessError("Not Implemented");
    }

    @Override // stream.io.QueueService
    public Data take() {
        try {
            return read();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override // stream.io.Sink
    public boolean write(Data data) throws Exception {
        return enqueue(data);
    }

    @Override // stream.io.Sink
    public boolean write(Collection<Data> collection) throws Exception {
        log.trace("Queue {}: Enqueuing event {}", getId(), collection);
        if (collection == null) {
            return false;
        }
        try {
            if (this.closed.get()) {
                return false;
            }
            int i = -1;
            ReentrantLock reentrantLock = this.putLock;
            AtomicInteger atomicInteger = this.count;
            reentrantLock.lockInterruptibly();
            while (atomicInteger.get() == this.capacity) {
                try {
                    this.notFull.await();
                } finally {
                    reentrantLock.unlock();
                }
            }
            if (this.closed.get()) {
                return false;
            }
            Iterator<Data> it = collection.iterator();
            while (it.hasNext()) {
                enqueue(new Node<>(it.next()));
                i = atomicInteger.getAndIncrement();
            }
            log.debug("{}:{}", this.id, Integer.valueOf(i));
            if (i + 1 < this.capacity) {
                this.notFull.signal();
            }
            reentrantLock.unlock();
            if (i != 0) {
                return true;
            }
            signalNotEmpty();
            return true;
        } catch (Exception e) {
            log.error("Error enqueuing item: {}", e.getMessage());
            if (!log.isDebugEnabled()) {
                return false;
            }
            e.printStackTrace();
            return false;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // stream.io.Barrel
    public int clear() {
        fullyLock();
        int i = this.count.get();
        try {
            Node<Data> node = this.head;
            while (true) {
                Node<Data> node2 = node.next;
                if (node2 == null) {
                    break;
                }
                node.next = node;
                node2.item = null;
                node = node2;
            }
            this.head = this.last;
            if (this.count.getAndSet(0) == this.capacity) {
                this.notFull.signal();
            }
            return i;
        } finally {
            fullyUnlock();
        }
    }

    @Override // stream.io.QueueService
    public int level() {
        return this.count.get();
    }

    @Override // stream.io.QueueService
    public int capacity() {
        return this.capacity;
    }

    @Override // stream.io.Queue
    public Integer getSize() {
        return Integer.valueOf(this.count.get());
    }

    @Override // stream.service.Service
    public void reset() throws Exception {
    }

    public String toString() {
        return "stream.io.BlockingQueue['" + this.id + "']";
    }
}
