package party.iroiro.r2jdbc.util;

import java.util.concurrent.atomic.AtomicInteger;
import lbmq.LinkedBlockingMultiQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:party/iroiro/r2jdbc/util/QueueDispatcher.class */
public class QueueDispatcher<T> implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(QueueDispatcher.class);
    private final LinkedBlockingMultiQueue<Integer, QueueItem<T>> queue;
    private final AtomicInteger i = new AtomicInteger(1);
    private final Scheduler scheduler = Schedulers.parallel();

    public QueueDispatcher(LinkedBlockingMultiQueue<Integer, QueueItem<T>> linkedBlockingMultiQueue) {
        this.queue = linkedBlockingMultiQueue;
    }

    public LinkedBlockingMultiQueue<Integer, QueueItem<T>>.SubQueue subQueue() {
        int andAdd = this.i.getAndAdd(1);
        this.queue.addSubQueue(Integer.valueOf(andAdd), 10);
        return this.queue.getSubQueue(Integer.valueOf(andAdd));
    }

    private void takeAndProcess() {
        try {
            QueueItem queueItem = (QueueItem) this.queue.take();
            if (queueItem.parallel) {
                this.scheduler.schedule(() -> {
                    queueItem.consumer.accept(queueItem.item, queueItem.e);
                });
            } else {
                queueItem.consumer.accept(queueItem.item, queueItem.e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        log.debug("Listening");
        Thread.currentThread().setName("R2jdbcDispatcher");
        while (!Thread.interrupted()) {
            takeAndProcess();
        }
        log.debug("Cleaning up");
        while (this.queue.peek() != null) {
            takeAndProcess();
        }
        log.debug("Exiting");
    }
}
