package org.swisspush.redisques.util;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.types.NumberType;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/swisspush/redisques/util/QueueStatisticsCollector.class */
public class QueueStatisticsCollector {
    private static final Logger log;
    private static final String STATSKEY = "redisques:stats";
    private static final String QUEUE_FAILURES = "failures";
    private static final String QUEUE_BACKPRESSURE = "backpressureTime";
    private static final String QUEUE_SLOWDOWNTIME = "slowdownTime";
    private static final String QUEUE_DEQUEUE_STATISTIC = "dequeueStatistic";
    private final Map<String, AtomicLong> queueFailureCount = new HashMap();
    private final Map<String, Long> queueBackpressureTime = new HashMap();
    private final Map<String, Long> queueSlowDownTime = new HashMap();
    private final Map<String, DequeueStatistic> dequeueStatistics = new HashMap();
    private final ConcurrentMap<String, AtomicLong> queueMessageSpeedCtr = new ConcurrentHashMap();
    private volatile Map<String, Long> queueMessageSpeed = new HashMap();
    private final RedisProvider redisProvider;
    private final String queuePrefix;
    private final Vertx vertx;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/util/QueueStatisticsCollector$QueueStatistic.class */
    public static class QueueStatistic {
        private final String queueName;
        private long size;
        private long failures;
        private long backpressureTime;
        private long slowdownTime;
        private long speed;
        private JsonObject dequeueStatistic;

        QueueStatistic(String str) {
            this.queueName = str;
        }

        void setSize(Long l) {
            if (l == null || l.longValue() < 0) {
                this.size = 0L;
            } else {
                this.size = l.longValue();
            }
        }

        void setFailures(Long l) {
            if (l == null || l.longValue() < 0) {
                this.failures = 0L;
            } else {
                this.failures = l.longValue();
            }
        }

        void setBackpressureTime(Long l) {
            if (l == null || l.longValue() < 0) {
                this.backpressureTime = 0L;
            } else {
                this.backpressureTime = l.longValue();
            }
        }

        void setSlowdownTime(Long l) {
            if (l == null || l.longValue() < 0) {
                this.slowdownTime = 0L;
            } else {
                this.slowdownTime = l.longValue();
            }
        }

        void setMessageSpeed(Long l) {
            if (l == null || l.longValue() < 0) {
                this.speed = 0L;
            } else {
                this.speed = l.longValue();
            }
        }

        void setDequeueStatistic(DequeueStatistic dequeueStatistic) {
            if (dequeueStatistic == null || dequeueStatistic.isEmpty()) {
                this.dequeueStatistic = JsonObject.mapFrom(new DequeueStatistic());
            } else {
                this.dequeueStatistic = JsonObject.mapFrom(dequeueStatistic);
            }
        }

        JsonObject getAsJsonObject() {
            return new JsonObject().put(RedisquesAPI.MONITOR_QUEUE_NAME, this.queueName).put(RedisquesAPI.MONITOR_QUEUE_SIZE, Long.valueOf(this.size)).put("failures", Long.valueOf(this.failures)).put("backpressureTime", Long.valueOf(this.backpressureTime)).put("slowdownTime", Long.valueOf(this.slowdownTime)).put(RedisquesAPI.STATISTIC_QUEUE_SPEED, Long.valueOf(this.speed)).put("dequeueStatistic", this.dequeueStatistic);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/util/QueueStatisticsCollector$RequestCtx.class */
    public static class RequestCtx {
        private Message<JsonObject> event;
        private List<String> queueNames;
        private Redis conn;
        private RedisAPI redisAPI;
        private List<NumberType> queueLengthList;
        private HashMap<String, QueueStatistic> statistics;
        private Response redisFailStats;

        private RequestCtx() {
        }
    }

    public QueueStatisticsCollector(RedisProvider redisProvider, String str, Vertx vertx, int i) {
        this.redisProvider = redisProvider;
        this.queuePrefix = str;
        this.vertx = vertx;
        speedStatisticsScheduler(i);
    }

    private void speedStatisticsScheduler(int i) {
        if (i <= 0) {
            log.debug("No speed statistics required");
        } else {
            this.vertx.setPeriodic(i * 1000, l -> {
                log.debug("Schedule statistics queue speed collection");
                HashMap hashMap = new HashMap();
                Iterator<Map.Entry<String, AtomicLong>> it = this.queueMessageSpeedCtr.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<String, AtomicLong> next = it.next();
                    if (next.getValue().longValue() > 0) {
                        hashMap.put(next.getKey(), Long.valueOf(next.getValue().longValue()));
                    }
                    it.remove();
                }
                this.queueMessageSpeed = hashMap;
            });
        }
    }

    public void resetQueueFailureStatistics(String str) {
        AtomicLong remove = this.queueFailureCount.remove(str);
        this.queueSlowDownTime.remove(str);
        this.queueBackpressureTime.remove(str);
        if (remove == null || remove.get() <= 0) {
            return;
        }
        updateStatisticsInRedis(str);
    }

    public void resetQueueStatistics(JsonArray jsonArray) {
        if (jsonArray == null || jsonArray.isEmpty()) {
            return;
        }
        int size = jsonArray.size();
        for (int i = 0; i < size; i++) {
            resetQueueFailureStatistics(jsonArray.getString(i));
        }
    }

    public void queueMessageSuccess(String str) {
        AtomicLong putIfAbsent = this.queueMessageSpeedCtr.putIfAbsent(str, new AtomicLong(1L));
        if (putIfAbsent != null) {
            putIfAbsent.incrementAndGet();
        }
        resetQueueFailureStatistics(str);
    }

    private long getQueueSpeed(String str) {
        Long l = this.queueMessageSpeed.get(str);
        if (l != null) {
            return l.longValue();
        }
        return 0L;
    }

    public long queueMessageFailed(String str) {
        long j = 1;
        AtomicLong putIfAbsent = this.queueFailureCount.putIfAbsent(str, new AtomicLong(1L));
        if (putIfAbsent != null) {
            j = putIfAbsent.addAndGet(1L);
        }
        updateStatisticsInRedis(str);
        return j;
    }

    public long getQueueFailureCount(String str) {
        AtomicLong atomicLong = this.queueFailureCount.get(str);
        if (atomicLong != null) {
            return atomicLong.longValue();
        }
        return 0L;
    }

    public void setQueueBackPressureTime(String str, long j) {
        if (j > 0) {
            this.queueBackpressureTime.put(str, Long.valueOf(j));
            updateStatisticsInRedis(str);
        } else if (this.queueBackpressureTime.remove(str) != null) {
            updateStatisticsInRedis(str);
        }
    }

    private long getQueueBackPressureTime(String str) {
        return this.queueBackpressureTime.getOrDefault(str, 0L).longValue();
    }

    public void setQueueSlowDownTime(String str, long j) {
        if (j > 0) {
            this.queueSlowDownTime.put(str, Long.valueOf(j));
            updateStatisticsInRedis(str);
        } else if (this.queueSlowDownTime.remove(str) != null) {
            updateStatisticsInRedis(str);
        }
    }

    private long getQueueSlowDownTime(String str) {
        return this.queueSlowDownTime.getOrDefault(str, 0L).longValue();
    }

    public void setDequeueStatistic(String str, DequeueStatistic dequeueStatistic) {
        if (dequeueStatistic.isEmpty()) {
            return;
        }
        this.dequeueStatistics.put(str, dequeueStatistic);
        updateStatisticsInRedis(str);
    }

    private DequeueStatistic getDequeueStatistic(String str) {
        return this.dequeueStatistics.getOrDefault(str, new DequeueStatistic());
    }

    private void updateStatisticsInRedis(String str) {
        long queueFailureCount = getQueueFailureCount(str);
        long queueSlowDownTime = getQueueSlowDownTime(str);
        long queueBackPressureTime = getQueueBackPressureTime(str);
        DequeueStatistic dequeueStatistic = getDequeueStatistic(str);
        if (queueFailureCount <= 0 && queueSlowDownTime <= 0 && queueBackPressureTime <= 0 && dequeueStatistic.isEmpty()) {
            this.redisProvider.redis().onSuccess(redisAPI -> {
                redisAPI.hdel(List.of(STATSKEY, str), asyncResult -> {
                    if (asyncResult.failed()) {
                        log.warn("TODO error handling", new Exception(asyncResult.cause()));
                    }
                });
            }).onFailure(th -> {
                log.error("Redis: Error in updateStatisticsInRedis", th);
            });
            return;
        }
        JsonObject jsonObject = new JsonObject();
        jsonObject.put(RedisquesAPI.QUEUENAME, str);
        jsonObject.put("failures", Long.valueOf(queueFailureCount));
        jsonObject.put("slowdownTime", Long.valueOf(queueSlowDownTime));
        jsonObject.put("backpressureTime", Long.valueOf(queueBackPressureTime));
        jsonObject.put("dequeueStatistic", JsonObject.mapFrom(dequeueStatistic));
        this.redisProvider.redis().onSuccess(redisAPI2 -> {
            redisAPI2.hset(List.of(STATSKEY, str, jsonObject.toString()), asyncResult -> {
                if (asyncResult.failed()) {
                    log.warn("TODO error handling", new Exception(asyncResult.cause()));
                }
            });
        }).onFailure(th2 -> {
            log.error("Redis: Error in updateStatisticsInRedis", th2);
        });
    }

    public Future<JsonObject> getQueueStatistics(List<String> list) {
        Promise promise = Promise.promise();
        if (list == null || list.isEmpty()) {
            log.debug("Queue statistics evaluation with empty queues, returning empty result");
            promise.complete(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK).put(RedisquesAPI.QUEUES, new JsonArray()));
            return promise.future();
        }
        RequestCtx requestCtx = new RequestCtx();
        requestCtx.queueNames = list;
        step1(requestCtx).compose(jsonObject -> {
            return step2(requestCtx).compose(jsonObject -> {
                return step3(requestCtx).compose(jsonObject -> {
                    return step4(requestCtx).compose(jsonObject -> {
                        return step5(requestCtx).compose(jsonObject -> {
                            return step6(requestCtx);
                        });
                    });
                });
            });
        }).onComplete(promise);
        return promise.future();
    }

    Future<JsonObject> step1(RequestCtx requestCtx) {
        Promise promise = Promise.promise();
        this.redisProvider.connection().onFailure(th -> {
            promise.fail(new Exception("Redis: Failed to get queue length.", th));
        }).onSuccess(redis -> {
            if (!$assertionsDisabled && redis == null) {
                throw new AssertionError();
            }
            requestCtx.conn = redis;
            promise.complete();
        });
        return promise.future();
    }

    Future<JsonObject> step2(RequestCtx requestCtx) {
        if (!$assertionsDisabled && requestCtx.conn == null) {
            throw new AssertionError();
        }
        Promise promise = Promise.promise();
        CompositeFuture.all((List) requestCtx.queueNames.stream().map(str -> {
            return requestCtx.conn.send(Request.cmd(Command.LLEN, new Object[]{this.queuePrefix + str}));
        }).collect(Collectors.toList())).onFailure(th -> {
            promise.fail("Unexpected queue length result");
        }).onSuccess(compositeFuture -> {
            List<NumberType> list = compositeFuture.list();
            if (list == null) {
                promise.fail("Unexpected queue length result: null");
            } else if (list.size() != requestCtx.queueNames.size()) {
                promise.fail("Unexpected queue length result with unequal size " + requestCtx.queueNames.size() + " : " + list.size());
            } else {
                requestCtx.queueLengthList = list;
                promise.complete();
            }
        });
        return promise.future();
    }

    Future<JsonObject> step3(RequestCtx requestCtx) {
        if (!$assertionsDisabled && requestCtx.queueLengthList == null) {
            throw new AssertionError();
        }
        Promise promise = Promise.promise();
        requestCtx.statistics = new HashMap<>(requestCtx.queueNames.size());
        for (int i = 0; i < requestCtx.queueNames.size(); i++) {
            QueueStatistic queueStatistic = new QueueStatistic(requestCtx.queueNames.get(i));
            queueStatistic.setSize(requestCtx.queueLengthList.get(i).toLong());
            queueStatistic.setMessageSpeed(Long.valueOf(getQueueSpeed(queueStatistic.queueName)));
            requestCtx.statistics.put(queueStatistic.queueName, queueStatistic);
        }
        promise.complete();
        return promise.future();
    }

    Future<JsonObject> step4(RequestCtx requestCtx) {
        Promise promise = Promise.promise();
        this.redisProvider.redis().onFailure(th -> {
            promise.fail(new Exception("Redis: Error in getQueueStatistics", th));
        }).onSuccess(redisAPI -> {
            if (!$assertionsDisabled && redisAPI == null) {
                throw new AssertionError();
            }
            requestCtx.redisAPI = redisAPI;
            promise.complete();
        });
        return promise.future();
    }

    Future<JsonObject> step5(RequestCtx requestCtx) {
        if (!$assertionsDisabled && requestCtx.redisAPI == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && requestCtx.statistics == null) {
            throw new AssertionError();
        }
        Promise promise = Promise.promise();
        requestCtx.redisAPI.hvals(STATSKEY, asyncResult -> {
            if (asyncResult == null || asyncResult.failed()) {
                promise.fail(new RuntimeException("statistics queue evaluation failed", asyncResult == null ? null : asyncResult.cause()));
                return;
            }
            requestCtx.redisFailStats = (Response) asyncResult.result();
            if (!$assertionsDisabled && requestCtx.redisFailStats == null) {
                throw new AssertionError();
            }
            promise.complete();
        });
        return promise.future();
    }

    Future<JsonObject> step6(RequestCtx requestCtx) {
        if (!$assertionsDisabled && requestCtx.redisFailStats == null) {
            throw new AssertionError();
        }
        Promise promise = Promise.promise();
        Iterator it = requestCtx.redisFailStats.iterator();
        while (it.hasNext()) {
            JsonObject jsonObject = new JsonObject(((Response) it.next()).toString());
            QueueStatistic queueStatistic = requestCtx.statistics.get(jsonObject.getString(RedisquesAPI.QUEUENAME));
            if (queueStatistic != null) {
                queueStatistic.setFailures(jsonObject.getLong("failures", 0L));
                queueStatistic.setBackpressureTime(jsonObject.getLong("backpressureTime", 0L));
                queueStatistic.setSlowdownTime(jsonObject.getLong("slowdownTime", 0L));
                if (jsonObject.containsKey("dequeueStatistic")) {
                    queueStatistic.setDequeueStatistic((DequeueStatistic) jsonObject.getJsonObject("dequeueStatistic").mapTo(DequeueStatistic.class));
                }
            }
        }
        JsonArray jsonArray = new JsonArray();
        Iterator<String> it2 = requestCtx.queueNames.iterator();
        while (it2.hasNext()) {
            QueueStatistic queueStatistic2 = requestCtx.statistics.get(it2.next());
            if (queueStatistic2 != null) {
                jsonArray.add(queueStatistic2.getAsJsonObject());
            }
        }
        promise.complete(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK).put(RedisquesAPI.QUEUES, jsonArray));
        return promise.future();
    }

    public void getQueuesSpeed(Message<JsonObject> message, List<String> list) {
        if (list == null || list.isEmpty()) {
            log.debug("No matching filtered queues given");
            message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK).put(RedisquesAPI.STATISTIC_QUEUE_SPEED, 0L));
            return;
        }
        long j = 0;
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            j += getQueueSpeed(it.next());
        }
        message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK).put(RedisquesAPI.STATISTIC_QUEUE_SPEED, Long.valueOf(j)));
    }

    static {
        $assertionsDisabled = !QueueStatisticsCollector.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(QueueStatisticsCollector.class);
    }
}
