package org.qbit.queue.impl;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.boon.Boon;
import org.boon.Exceptions;
import org.boon.Logger;
import org.qbit.queue.Queue;
import org.qbit.queue.ReceiveQueue;
import org.qbit.queue.ReceiveQueueListener;
import org.qbit.queue.ReceiveQueueManager;
import org.qbit.queue.SendQueue;

/* loaded from: input_file:org/qbit/queue/impl/BasicQueue.class */
public class BasicQueue<T> implements Queue<T> {
    private final int batchSize;
    private ScheduledExecutorService monitor;
    private ScheduledFuture<?> future;
    private final String name;
    private final int waitTime;
    private final TimeUnit timeUnit;
    private final LinkedTransferQueue<Object> queue = new LinkedTransferQueue<>();
    private final AtomicBoolean stop = new AtomicBoolean();
    private Logger logger = Boon.logger(BasicQueue.class);
    private ReceiveQueueManager<T> receiveQueueManager = new BasicReceiveQueueManager();

    public BasicQueue(String str, int i, TimeUnit timeUnit, int i2) {
        this.name = str;
        this.waitTime = i;
        this.timeUnit = timeUnit;
        this.batchSize = i2;
    }

    @Override // org.qbit.queue.Queue
    public ReceiveQueue<T> receiveQueue() {
        return new BasicReceiveQueue(this.queue, this.waitTime, this.timeUnit, this.batchSize);
    }

    @Override // org.qbit.queue.Queue
    public SendQueue<T> sendQueue() {
        return new BasicSendQueue(this.batchSize, this.queue);
    }

    @Override // org.qbit.queue.Queue
    public void startListener(final ReceiveQueueListener<T> receiveQueueListener) {
        if (this.monitor == null) {
            this.monitor = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: org.qbit.queue.impl.BasicQueue.1
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable);
                    thread.setName("QueueListener " + BasicQueue.this.name);
                    return thread;
                }
            });
        } else {
            Exceptions.die("Only one BasicQueue listener allowed at a time");
        }
        this.future = this.monitor.scheduleAtFixedRate(new Runnable() { // from class: org.qbit.queue.impl.BasicQueue.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    BasicQueue.this.manageQueue(receiveQueueListener);
                } catch (Exception e) {
                    BasicQueue.this.logger.error(e, new Object[]{"BasicQueue Manager", "Problem running queue manager"});
                }
            }
        }, 50L, 50L, TimeUnit.MILLISECONDS);
    }

    @Override // org.qbit.queue.Queue
    public void stop() {
        if (this.future != null) {
            this.future.cancel(true);
        }
        if (this.monitor != null) {
            this.monitor.shutdownNow();
        }
        this.stop.set(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void manageQueue(ReceiveQueueListener<T> receiveQueueListener) {
        this.receiveQueueManager.manageQueue(receiveQueue(), receiveQueueListener, this.batchSize, this.stop);
    }

    public static <T> BasicQueue<T> create(Class<T> cls) {
        return new BasicQueue<>("BasicQueue", 10, TimeUnit.MILLISECONDS, 10);
    }

    public static <T> BasicQueue<T> create(Class<T> cls, int i) {
        return new BasicQueue<>("BasicQueue", 10, TimeUnit.MILLISECONDS, i);
    }
}
