package org.swisspush.redisques;

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisquesAPI;

/* loaded from: input_file:org/swisspush/redisques/QueueStatsService.class */
public class QueueStatsService {
    private static final Logger log;
    private final Vertx vertx;
    private final EventBus eventBus;
    private final String redisquesAddress;
    private final QueueStatisticsCollector queueStatisticsCollector;
    private final DequeueStatisticCollector dequeueStatisticCollector;
    private final RedisQuesExceptionFactory exceptionFactory;
    private final Semaphore incomingRequestQuota;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/swisspush/redisques/QueueStatsService$GetQueueStatsMentor.class */
    public interface GetQueueStatsMentor<CTX> {
        boolean includeEmptyQueues(CTX ctx);

        int limit(CTX ctx);

        String filter(CTX ctx);

        void onQueueStatistics(List<Queue> list, CTX ctx);

        void onError(Throwable th, CTX ctx);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/QueueStatsService$GetQueueStatsRequest.class */
    public static class GetQueueStatsRequest<CTX> {
        private CTX mCtx;
        private GetQueueStatsMentor<CTX> mentor;
        private List<String> queueNames;
        private JsonArray queuesJsonArr;
        private List<Queue> queues;

        private GetQueueStatsRequest() {
        }
    }

    /* loaded from: input_file:org/swisspush/redisques/QueueStatsService$Queue.class */
    public static class Queue {
        private final String name;
        private final long size;
        private Long lastDequeueAttemptEpochMs;
        private Long lastDequeueSuccessEpochMs;
        private Long nextDequeueDueTimestampEpochMs;
        static final /* synthetic */ boolean $assertionsDisabled;

        private Queue(String str, long j) {
            if (!$assertionsDisabled && str == null) {
                throw new AssertionError();
            }
            this.name = str;
            this.size = j;
        }

        public String getName() {
            return this.name;
        }

        public long getSize() {
            return this.size;
        }

        public Long getLastDequeueAttemptEpochMs() {
            return this.lastDequeueAttemptEpochMs;
        }

        public Long getLastDequeueSuccessEpochMs() {
            return this.lastDequeueSuccessEpochMs;
        }

        public Long getNextDequeueDueTimestampEpochMs() {
            return this.nextDequeueDueTimestampEpochMs;
        }

        static {
            $assertionsDisabled = !QueueStatsService.class.desiredAssertionStatus();
        }
    }

    public QueueStatsService(Vertx vertx, EventBus eventBus, String str, QueueStatisticsCollector queueStatisticsCollector, DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory redisQuesExceptionFactory, Semaphore semaphore) {
        this.vertx = vertx;
        this.eventBus = eventBus;
        this.redisquesAddress = str;
        this.queueStatisticsCollector = queueStatisticsCollector;
        this.dequeueStatisticCollector = dequeueStatisticCollector;
        this.exceptionFactory = redisQuesExceptionFactory;
        this.incomingRequestQuota = semaphore;
    }

    public <CTX> void getQueueStats(CTX ctx, GetQueueStatsMentor<CTX> getQueueStatsMentor) {
        if (!this.incomingRequestQuota.tryAcquire()) {
            ReplyException newReplyException = this.exceptionFactory.newReplyException(429, "Server too busy to handle yet-another-queue-stats-request now", null);
            this.vertx.runOnContext(r7 -> {
                getQueueStatsMentor.onError(newReplyException, ctx);
            });
            return;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        try {
            GetQueueStatsRequest<CTX> getQueueStatsRequest = new GetQueueStatsRequest<>();
            BiConsumer biConsumer = (th, list) -> {
                if (!atomicBoolean.compareAndSet(false, true)) {
                    if (log.isInfoEnabled()) {
                        log.info("", new RuntimeException("onDone MUST be called ONCE only", th));
                    }
                } else {
                    this.incomingRequestQuota.release();
                    if (th != null) {
                        getQueueStatsMentor.onError(th, ctx);
                    } else {
                        getQueueStatsMentor.onQueueStatistics(list, ctx);
                    }
                }
            };
            ((GetQueueStatsRequest) getQueueStatsRequest).mCtx = ctx;
            ((GetQueueStatsRequest) getQueueStatsRequest).mentor = getQueueStatsMentor;
            fetchQueueNamesAndSize(getQueueStatsRequest, (th2, getQueueStatsRequest2) -> {
                if (th2 != null) {
                    biConsumer.accept(th2, null);
                    return;
                }
                getQueueStatsRequest2.queueNames = new ArrayList(getQueueStatsRequest2.queues.size());
                Iterator<Queue> it = getQueueStatsRequest2.queues.iterator();
                while (it.hasNext()) {
                    getQueueStatsRequest2.queueNames.add(it.next().name);
                }
                fetchRetryDetails(getQueueStatsRequest2, (th2, getQueueStatsRequest2) -> {
                    if (th2 != null) {
                        biConsumer.accept(th2, null);
                    } else {
                        attachDequeueStats(getQueueStatsRequest2, (th2, getQueueStatsRequest2) -> {
                            if (th2 != null) {
                                biConsumer.accept(th2, null);
                            } else {
                                biConsumer.accept(null, getQueueStatsRequest2.queues);
                            }
                        });
                    }
                });
            });
        } catch (Exception e) {
            if (atomicBoolean.compareAndSet(false, true)) {
                this.incomingRequestQuota.release();
                this.vertx.runOnContext(r72 -> {
                    getQueueStatsMentor.onError(e, ctx);
                });
            } else if (log.isInfoEnabled()) {
                log.info("onDone MUST be called ONCE only", e);
            }
        }
    }

    private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> getQueueStatsRequest, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> biConsumer) {
        this.eventBus.request(this.redisquesAddress, RedisquesAPI.buildGetQueuesItemsCountOperation(((GetQueueStatsRequest) getQueueStatsRequest).mentor.filter(((GetQueueStatsRequest) getQueueStatsRequest).mCtx)), asyncResult -> {
            if (asyncResult.failed()) {
                Exception newException = this.exceptionFactory.newException("eventBus.request()", asyncResult.cause());
                if (!$assertionsDisabled && newException == null) {
                    throw new AssertionError();
                }
                biConsumer.accept(newException, null);
                return;
            }
            JsonObject jsonObject = (JsonObject) ((Message) asyncResult.result()).body();
            String string = jsonObject.getString(RedisquesAPI.STATUS);
            if (!RedisquesAPI.OK.equals(string)) {
                Exception newException2 = this.exceptionFactory.newException("Unexpected status " + string);
                if (!$assertionsDisabled && newException2 == null) {
                    throw new AssertionError();
                }
                biConsumer.accept(newException2, null);
                return;
            }
            JsonArray jsonArray = jsonObject.getJsonArray(RedisquesAPI.QUEUES);
            if (jsonArray == null || jsonArray.isEmpty()) {
                log.debug("result was {}, we return an empty result.", jsonArray == null ? "null" : "empty");
                getQueueStatsRequest.queues = Collections.emptyList();
                biConsumer.accept(null, getQueueStatsRequest);
                return;
            }
            boolean includeEmptyQueues = getQueueStatsRequest.mentor.includeEmptyQueues(getQueueStatsRequest.mCtx);
            ArrayList arrayList = new ArrayList(jsonArray.size());
            Iterator it = jsonArray.iterator();
            while (it.hasNext()) {
                JsonObject jsonObject2 = (JsonObject) it.next();
                String string2 = jsonObject2.getString(RedisquesAPI.MONITOR_QUEUE_NAME);
                Long l = jsonObject2.getLong(RedisquesAPI.MONITOR_QUEUE_SIZE);
                if (includeEmptyQueues || (l != null && l.longValue() != 0)) {
                    arrayList.add(new Queue(string2, l.longValue()));
                }
            }
            arrayList.sort(this::compareLargestFirst);
            int limit = getQueueStatsRequest.mentor.limit(getQueueStatsRequest.mCtx);
            if (limit != 0 && arrayList.size() > limit) {
                arrayList = arrayList.subList(0, limit);
            }
            getQueueStatsRequest.queues = arrayList;
            biConsumer.accept(null, getQueueStatsRequest);
        });
    }

    private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> getQueueStatsRequest, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> biConsumer) {
        long currentTimeMillis = System.currentTimeMillis();
        if (!$assertionsDisabled && ((GetQueueStatsRequest) getQueueStatsRequest).queueNames == null) {
            throw new AssertionError();
        }
        this.queueStatisticsCollector.getQueueStatistics(((GetQueueStatsRequest) getQueueStatsRequest).queueNames).onComplete(asyncResult -> {
            getQueueStatsRequest.queueNames = null;
            log.debug("queueStatisticsCollector.getQueueStatistics() took {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            if (asyncResult.failed()) {
                log.warn("queueStatisticsCollector.getQueueStatistics() failed. Fallback to empty result.", asyncResult.cause());
                getQueueStatsRequest.queuesJsonArr = new JsonArray();
                biConsumer.accept(null, getQueueStatsRequest);
                return;
            }
            JsonObject jsonObject = (JsonObject) asyncResult.result();
            String string = jsonObject.getString(RedisquesAPI.STATUS);
            if (RedisquesAPI.OK.equals(string)) {
                getQueueStatsRequest.queuesJsonArr = jsonObject.getJsonArray(RedisquesAPI.QUEUES);
                biConsumer.accept(null, getQueueStatsRequest);
            } else {
                log.warn("queueStatisticsCollector.getQueueStatistics() responded '" + string + "'. Fallback to empty result.", asyncResult.cause());
                getQueueStatsRequest.queuesJsonArr = new JsonArray();
                biConsumer.accept(null, getQueueStatsRequest);
            }
        });
    }

    private <CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> getQueueStatsRequest, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> biConsumer) {
        this.dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(map -> {
            for (Queue queue : getQueueStatsRequest.queues) {
                if (map.containsKey(queue.name)) {
                    DequeueStatistic dequeueStatistic = (DequeueStatistic) map.get(queue.name);
                    queue.lastDequeueAttemptEpochMs = dequeueStatistic.getLastDequeueAttemptTimestamp();
                    queue.lastDequeueSuccessEpochMs = dequeueStatistic.getLastDequeueSuccessTimestamp();
                    queue.nextDequeueDueTimestampEpochMs = dequeueStatistic.getNextDequeueDueTimestamp();
                }
            }
            biConsumer.accept(null, getQueueStatsRequest);
        }).onFailure(th -> {
            log.warn("queueStatisticsCollector.getAllDequeueStatistics() failed. Fallback to empty result.", th);
            biConsumer.accept(null, getQueueStatsRequest);
        });
    }

    private int compareLargestFirst(Queue queue, Queue queue2) {
        return Long.compare(queue2.size, queue.size);
    }

    static {
        $assertionsDisabled = !QueueStatsService.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(QueueStatsService.class);
    }
}
