package org.qbit.queue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.boon.Exceptions;
import org.boon.Lists;

/* loaded from: input_file:org/qbit/queue/BasicQueue.class */
public class BasicQueue<T> implements Queue<T> {
    private final LinkedTransferQueue<T> queue = new LinkedTransferQueue<>();
    private final int waitTime;
    private final TimeUnit timeUnit;
    private ScheduledExecutorService monitor;
    private ScheduledFuture<?> future;
    private volatile boolean stop;
    private final int batchSize;

    public BasicQueue(int i, TimeUnit timeUnit, int i2) {
        this.waitTime = i;
        this.timeUnit = timeUnit;
        this.batchSize = i2;
    }

    @Override // org.qbit.queue.Queue
    public InputQueue<T> input() {
        return new InputQueue<T>() { // from class: org.qbit.queue.BasicQueue.1
            @Override // org.qbit.queue.InputQueue
            public T pollWait() {
                try {
                    return (T) BasicQueue.this.queue.poll(BasicQueue.this.waitTime, BasicQueue.this.timeUnit);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    return null;
                }
            }

            @Override // org.qbit.queue.InputQueue
            public T poll() {
                return (T) BasicQueue.this.queue.poll();
            }

            @Override // org.qbit.queue.InputQueue
            public T take() {
                try {
                    return (T) BasicQueue.this.queue.take();
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    return null;
                }
            }

            @Override // org.qbit.queue.InputQueue
            public Iterable<T> readBatch(int i) {
                Object poll = BasicQueue.this.queue.poll();
                if (poll == null) {
                    return Collections.EMPTY_LIST;
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(poll);
                while (true) {
                    Object poll2 = BasicQueue.this.queue.poll();
                    if (poll2 == null) {
                        return arrayList;
                    }
                    arrayList.add(poll2);
                }
            }
        };
    }

    @Override // org.qbit.queue.Queue
    public OutputQueue<T> output() {
        return new OutputQueue<T>() { // from class: org.qbit.queue.BasicQueue.2
            @Override // org.qbit.queue.OutputQueue
            public boolean offer(T t) {
                return BasicQueue.this.queue.offer(t);
            }

            @Override // org.qbit.queue.OutputQueue
            public List<T> offerMany(T... tArr) {
                List<T> linkedList = Lists.linkedList(tArr);
                for (T t : tArr) {
                    if (!BasicQueue.this.queue.offer(t)) {
                        break;
                    }
                    linkedList.remove(t);
                }
                return linkedList;
            }

            @Override // org.qbit.queue.OutputQueue
            public List<T> offerBatch(Iterable<T> iterable) {
                List<T> list = Lists.list(iterable);
                for (T t : iterable) {
                    if (!BasicQueue.this.queue.offer(t)) {
                        break;
                    }
                    list.remove(t);
                }
                return list;
            }
        };
    }

    @Override // org.qbit.queue.Queue
    public void startListener(final InputQueueListener<T> inputQueueListener) {
        if (this.monitor == null) {
            this.monitor = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: org.qbit.queue.BasicQueue.3
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable);
                    thread.setName("BasicQueueListener");
                    return thread;
                }
            });
        } else {
            Exceptions.die("Only one BasicQueue listener allowed at a time");
        }
        this.future = this.monitor.scheduleAtFixedRate(new Runnable() { // from class: org.qbit.queue.BasicQueue.4
            @Override // java.lang.Runnable
            public void run() {
                BasicQueue.this.manageQueue(inputQueueListener);
            }
        }, 50L, 50L, TimeUnit.MILLISECONDS);
    }

    @Override // org.qbit.queue.Queue
    public void stop() {
        if (this.future != null) {
            this.future.cancel(true);
        }
        if (this.monitor != null) {
            this.monitor.shutdownNow();
        }
        this.stop = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void manageQueue(InputQueueListener<T> inputQueueListener) {
        T poll = this.queue.poll();
        int i = 0;
        long j = 0;
        while (true) {
            if (poll != null) {
                i++;
            }
            while (true) {
                if (poll == null) {
                    break;
                }
                inputQueueListener.receive(poll);
                if (i >= this.batchSize) {
                    inputQueueListener.limit();
                    break;
                } else {
                    poll = this.queue.poll();
                    i++;
                }
            }
            if (poll == null) {
                inputQueueListener.empty();
            }
            i = 0;
            poll = this.queue.poll();
            if (poll == null) {
                Thread.yield();
                try {
                    poll = this.queue.poll(this.waitTime, this.timeUnit);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    if (this.stop) {
                        inputQueueListener.shutdown();
                        return;
                    }
                }
                if (poll == null) {
                    if (j % 10 == 0 && this.stop) {
                        inputQueueListener.shutdown();
                        return;
                    }
                    inputQueueListener.idle();
                }
                j++;
            }
        }
    }
}
