package io.fluxcapacitor.common;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/common/Backlog.class */
public class Backlog<T> implements Monitored<List<T>> {
    private static final Logger log = LoggerFactory.getLogger(Backlog.class);
    private final int maxBatchSize;
    private final Queue<T> queue;
    private final BatchConsumer<T> consumer;
    private final ErrorHandler<List<T>> errorHandler;
    private final ExecutorService executorService;
    private final AtomicBoolean flushing;
    private final AtomicLong insertPosition;
    private final AtomicLong flushPosition;
    private final AtomicReference<Awaitable> syncObject;
    private final Collection<Consumer<List<T>>> monitors;

    @FunctionalInterface
    /* loaded from: input_file:io/fluxcapacitor/common/Backlog$BatchConsumer.class */
    public interface BatchConsumer<T> {
        Awaitable accept(List<T> list) throws Exception;
    }

    public Backlog(BatchConsumer<T> batchConsumer) {
        this(batchConsumer, 1024);
    }

    public Backlog(BatchConsumer<T> batchConsumer, int i) {
        this(batchConsumer, i, 1, (exc, list) -> {
            log.error("Consumer {} failed to handle batch {}. Continuing with next batch.", new Object[]{batchConsumer, list, exc});
        });
    }

    public Backlog(BatchConsumer<T> batchConsumer, int i, int i2, ErrorHandler<List<T>> errorHandler) {
        this.queue = new ConcurrentLinkedQueue();
        this.flushing = new AtomicBoolean();
        this.insertPosition = new AtomicLong();
        this.flushPosition = new AtomicLong();
        this.syncObject = new AtomicReference<>();
        this.monitors = new CopyOnWriteArraySet();
        this.maxBatchSize = i;
        this.consumer = batchConsumer;
        this.executorService = Executors.newFixedThreadPool(i2);
        this.errorHandler = errorHandler;
    }

    @SafeVarargs
    public final Awaitable add(T... tArr) {
        Collections.addAll(this.queue, tArr);
        return tArr.length == 0 ? Awaitable.ready() : awaitFlush(this.insertPosition.updateAndGet(j -> {
            return j + tArr.length;
        }));
    }

    public Awaitable add(Collection<? extends T> collection) {
        this.queue.addAll(collection);
        return collection.isEmpty() ? Awaitable.ready() : awaitFlush(this.insertPosition.updateAndGet(j -> {
            return j + collection.size();
        }));
    }

    private Awaitable awaitFlush(long j) {
        flushIfNotFlushing();
        return () -> {
            synchronized (this.syncObject) {
                while (j > this.flushPosition.get()) {
                    this.syncObject.wait();
                }
                this.syncObject.get().await();
            }
        };
    }

    private void flushIfNotFlushing() {
        if (this.flushing.compareAndSet(false, true)) {
            this.executorService.execute(this::flush);
        }
    }

    private void flush() {
        Awaitable failed;
        T poll;
        while (!this.queue.isEmpty()) {
            try {
                ArrayList arrayList = new ArrayList(this.maxBatchSize);
                while (arrayList.size() < this.maxBatchSize && (poll = this.queue.poll()) != null) {
                    arrayList.add(poll);
                }
                try {
                    failed = this.consumer.accept(arrayList);
                } catch (Exception e) {
                    failed = Awaitable.failed(e);
                    this.errorHandler.handleError(e, arrayList);
                }
                this.flushPosition.addAndGet(arrayList.size());
                synchronized (this.syncObject) {
                    this.syncObject.set(failed);
                    this.syncObject.notifyAll();
                }
                this.monitors.forEach(consumer -> {
                    consumer.accept(arrayList);
                });
            } catch (Exception e2) {
                log.error("Failed to flush the backlog", e2);
                this.flushing.set(false);
                throw e2;
            }
        }
        this.flushing.set(false);
        if (!this.queue.isEmpty()) {
            flushIfNotFlushing();
        }
    }

    @Override // io.fluxcapacitor.common.Monitored
    public Registration registerMonitor(Consumer<List<T>> consumer) {
        this.monitors.add(consumer);
        return () -> {
            this.monitors.remove(consumer);
        };
    }

    public void shutDown() {
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("Shutdown of executor was interrupted", e);
            Thread.currentThread().interrupt();
        }
    }
}
