package org.swisspush.redisques;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;
import io.vertx.redis.op.RangeLimitOptions;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.swisspush.redisques.handler.AddItemHandler;
import org.swisspush.redisques.handler.DeleteAllQueueItems;
import org.swisspush.redisques.handler.DeleteLockHandler;
import org.swisspush.redisques.handler.GetAllLocksHandler;
import org.swisspush.redisques.handler.GetItemHandler;
import org.swisspush.redisques.handler.GetListRangeHandler;
import org.swisspush.redisques.handler.GetLockHandler;
import org.swisspush.redisques.handler.PutLockHandler;
import org.swisspush.redisques.handler.ReplaceItemHandler;

/* loaded from: input_file:org/swisspush/redisques/RedisQues.class */
public class RedisQues extends AbstractVerticle {
    private MessageConsumer<String> uidMessageConsumer;
    private MessageConsumer<String> conumersMessageConsumer;
    private RedisClient redisClient;
    public static final String STATUS = "status";
    public static final String MESSAGE = "message";
    public static final String VALUE = "value";
    public static final String INFO = "info";
    public static final String OK = "ok";
    public static final String ERROR = "error";
    public static final String PAYLOAD = "payload";
    public static final String QUEUE_NAME = "queuename";
    public static final String REQUESTED_BY = "requestedBy";
    public static final String TIMESTAMP = "timestamp";
    public static final String LIMIT = "limit";
    public static final String INDEX = "index";
    public static final String BUFFER = "buffer";
    public static final String OPERATION = "operation";
    private static final int DEFAULT_MAX_QUEUEITEM_COUNT = 49;
    private String uid = UUID.randomUUID().toString();
    private Map<String, QueueState> myQueues = new HashMap();
    private Logger log = LoggerFactory.getLogger(RedisQues.class);
    private Handler<Void> stoppedHandler = null;
    private String address = "redisques";
    private String redisPrefix = "redisques:";
    private String queuesPrefix = this.redisPrefix + "queues:";
    private String consumersPrefix = "consumers:";
    private String redisques_locks = "redisques:locks";
    private String processorAddress = "redisques-processor";
    private int refreshPeriod = 10;
    private int processorTimeout = 240000;
    private Handler<Message<String>> registrationRequestHandler = message -> {
        this.vertx.eventBus();
        String str = (String) message.body();
        this.log.debug("RedisQues Got registration request for queue " + str + " from consumer: " + this.uid);
        this.redisClient.setnx(this.redisPrefix + this.consumersPrefix + str, this.uid, asyncResult -> {
            long longValue = ((Long) asyncResult.result()).longValue();
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues setxn result: " + longValue + " for queue: " + str);
            }
            if (longValue != 1) {
                this.log.debug("RedisQues Missed registration for queue " + str);
                return;
            }
            this.log.debug("RedisQues Now registered for queue " + str);
            this.myQueues.put(str, QueueState.READY);
            consume(str);
        });
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$QueueState.class */
    public enum QueueState {
        READY,
        CONSUMING
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$SendResult.class */
    public class SendResult {
        public final Boolean success;
        public final Long timeoutId;

        public SendResult(Boolean bool, Long l) {
            this.success = bool;
            this.timeoutId = l;
        }
    }

    public void start() {
        EventBus eventBus = this.vertx.eventBus();
        this.log.info("Started with UID " + this.uid);
        JsonObject config = config();
        this.address = config.getString("address") != null ? config.getString("address") : this.address;
        this.redisPrefix = config.getString("redis-prefix") != null ? config.getString("redis-prefix") : this.redisPrefix;
        this.processorAddress = config.getString("processor-address") != null ? config.getString("processor-address") : this.processorAddress;
        this.refreshPeriod = config.getInteger("refresh-period") != null ? config.getInteger("refresh-period").intValue() : this.refreshPeriod;
        this.redisClient = RedisClient.create(this.vertx, new RedisOptions().setHost(config.getString("redisHost", "localhost")).setPort(config.getInteger("redisPort", 6379).intValue()).setEncoding(config.getString("redisEncoding", "UTF-8")));
        eventBus.localConsumer(this.address, new Handler<Message<JsonObject>>() { // from class: org.swisspush.redisques.RedisQues.1
            public void handle(Message<JsonObject> message) {
                String string = ((JsonObject) message.body()).getString(RedisQues.OPERATION);
                if (RedisQues.this.log.isTraceEnabled()) {
                    RedisQues.this.log.trace("RedisQues got operation:" + string);
                }
                boolean z = -1;
                switch (string.hashCode()) {
                    case -1594257912:
                        if (string.equals("enqueue")) {
                            z = false;
                            break;
                        }
                        break;
                    case -1246249847:
                        if (string.equals("getListRange")) {
                            z = 4;
                            break;
                        }
                        break;
                    case -1148899500:
                        if (string.equals("addItem")) {
                            z = 5;
                            break;
                        }
                        break;
                    case -436498179:
                        if (string.equals("getAllLocks")) {
                            z = 10;
                            break;
                        }
                        break;
                    case -219689766:
                        if (string.equals("putLock")) {
                            z = 11;
                            break;
                        }
                        break;
                    case -75439223:
                        if (string.equals("getItem")) {
                            z = 7;
                            break;
                        }
                        break;
                    case -75354719:
                        if (string.equals("getLock")) {
                            z = 12;
                            break;
                        }
                        break;
                    case 3540994:
                        if (string.equals("stop")) {
                            z = 3;
                            break;
                        }
                        break;
                    case 94627080:
                        if (string.equals("check")) {
                            z = true;
                            break;
                        }
                        break;
                    case 108404047:
                        if (string.equals("reset")) {
                            z = 2;
                            break;
                        }
                        break;
                    case 429860839:
                        if (string.equals("replaceItem")) {
                            z = 8;
                            break;
                        }
                        break;
                    case 1159256517:
                        if (string.equals("deleteAllQueueItems")) {
                            z = 9;
                            break;
                        }
                        break;
                    case 1764271966:
                        if (string.equals("deleteItem")) {
                            z = 6;
                            break;
                        }
                        break;
                    case 1764356470:
                        if (string.equals("deleteLock")) {
                            z = 13;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        RedisQues.this.updateTimestamp(((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), null);
                        RedisQues.this.redisClient.rpush(RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), ((JsonObject) message.body()).getString(RedisQues.MESSAGE), asyncResult -> {
                            JsonObject jsonObject = new JsonObject();
                            if (asyncResult.succeeded()) {
                                RedisQues.this.log.debug("RedisQues Enqueued message into queue " + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME));
                                RedisQues.this.notifyConsumer(((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME));
                                jsonObject.put(RedisQues.STATUS, RedisQues.OK);
                                jsonObject.put(RedisQues.MESSAGE, "enqueued");
                                message.reply(jsonObject);
                                return;
                            }
                            String str = "RedisQues QUEUE_ERROR: Error while enqueing message into queue " + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME);
                            RedisQues.this.log.error(str);
                            jsonObject.put(RedisQues.STATUS, RedisQues.ERROR);
                            jsonObject.put(RedisQues.MESSAGE, str);
                            message.reply(jsonObject);
                        });
                        return;
                    case true:
                        RedisQues.this.checkQueues();
                        return;
                    case true:
                        RedisQues.this.resetConsumers();
                        return;
                    case true:
                        RedisQues.this.gracefulStop(r4 -> {
                            new JsonObject().put(RedisQues.STATUS, RedisQues.OK);
                        });
                        return;
                    case true:
                        String str = RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME);
                        int maxQueueItemCountIndex = RedisQues.this.getMaxQueueItemCountIndex(((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.LIMIT));
                        RedisQues.this.redisClient.llen(str, asyncResult2 -> {
                            RedisQues.this.redisClient.lrange(str, 0L, maxQueueItemCountIndex, new GetListRangeHandler(message, (Long) asyncResult2.result()));
                        });
                        return;
                    case true:
                        RedisQues.this.redisClient.rpush(RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.BUFFER), new AddItemHandler(message));
                        return;
                    case true:
                        RedisQues.this.redisClient.lset(RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getInteger(RedisQues.INDEX).intValue(), "TO_DELETE", asyncResult3 -> {
                            if (!asyncResult3.succeeded()) {
                                message.reply(new JsonObject().put(RedisQues.STATUS, RedisQues.ERROR));
                            } else {
                                RedisQues.this.redisClient.lrem(RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), 0L, "TO_DELETE", asyncResult3 -> {
                                    message.reply(new JsonObject().put(RedisQues.STATUS, RedisQues.OK));
                                });
                            }
                        });
                        return;
                    case true:
                        RedisQues.this.redisClient.lindex(RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getInteger(RedisQues.INDEX).intValue(), new GetItemHandler(message));
                        return;
                    case true:
                        RedisQues.this.redisClient.lset(RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getInteger(RedisQues.INDEX).intValue(), ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.BUFFER), new ReplaceItemHandler(message));
                        return;
                    case true:
                        RedisQues.this.redisClient.del(RedisQues.this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), new DeleteAllQueueItems(message));
                        return;
                    case true:
                        RedisQues.this.redisClient.hkeys(RedisQues.this.redisques_locks, new GetAllLocksHandler(message));
                        return;
                    case true:
                        JsonObject extractLockInfo = RedisQues.this.extractLockInfo(((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.REQUESTED_BY));
                        if (extractLockInfo != null) {
                            RedisQues.this.redisClient.hmset(RedisQues.this.redisques_locks, new JsonObject().put(((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), extractLockInfo.encode()), new PutLockHandler(message));
                            return;
                        } else {
                            message.reply(new JsonObject().put(RedisQues.STATUS, RedisQues.ERROR).put(RedisQues.MESSAGE, "Property 'requestedBy' missing"));
                            return;
                        }
                    case true:
                        RedisQues.this.redisClient.hget(RedisQues.this.redisques_locks, ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), new GetLockHandler(message));
                        return;
                    case true:
                        RedisQues.this.redisClient.hdel(RedisQues.this.redisques_locks, ((JsonObject) message.body()).getJsonObject(RedisQues.PAYLOAD).getString(RedisQues.QUEUE_NAME), new DeleteLockHandler(message));
                        return;
                    default:
                        return;
                }
            }
        });
        this.conumersMessageConsumer = eventBus.consumer(this.address + "-consumers", this.registrationRequestHandler);
        this.uidMessageConsumer = eventBus.consumer(this.uid, new Handler<Message<String>>() { // from class: org.swisspush.redisques.RedisQues.2
            public void handle(Message<String> message) {
                String str = (String) message.body();
                RedisQues.this.log.debug("RedisQues Got notification for queue " + str);
                RedisQues.this.consume(str);
            }
        });
        this.vertx.setPeriodic(this.refreshPeriod * 1000, l -> {
            this.myQueues.entrySet().stream().filter(entry -> {
                return entry.getValue() == QueueState.CONSUMING;
            }).forEach(entry2 -> {
                String str = (String) entry2.getKey();
                String str2 = this.redisPrefix + this.consumersPrefix + str;
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues refresh queues get: " + str2);
                }
                this.redisClient.get(str2, asyncResult -> {
                    if (!this.uid.equals((String) asyncResult.result())) {
                        this.log.debug("RedisQues Removing queue " + str + " from the list");
                        this.myQueues.remove(str);
                    } else {
                        this.log.debug("RedisQues Periodic consumer refresh for active queue " + str);
                        refreshRegistration(str, null);
                        updateTimestamp(str, null);
                    }
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public JsonObject extractLockInfo(String str) {
        if (str == null) {
            return null;
        }
        JsonObject jsonObject = new JsonObject();
        jsonObject.put(REQUESTED_BY, str);
        jsonObject.put(TIMESTAMP, Long.valueOf(System.currentTimeMillis()));
        return jsonObject;
    }

    public void stop() {
        unregisterConsumers(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void gracefulStop(Handler<Void> handler) {
        this.conumersMessageConsumer.unregister(asyncResult -> {
            this.uidMessageConsumer.unregister(asyncResult -> {
                unregisterConsumers(false);
                this.stoppedHandler = handler;
                if (this.myQueues.keySet().isEmpty()) {
                    handler.handle((Object) null);
                }
            });
        });
    }

    private void unregisterConsumers(boolean z) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues unregister consumers force: " + z);
        }
        this.log.debug("RedisQues Unregistering consumers");
        for (Map.Entry<String, QueueState> entry : this.myQueues.entrySet()) {
            String key = entry.getKey();
            if (z || entry.getValue() == QueueState.READY) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues unregister consumers queue: " + key);
                }
                refreshRegistration(key, asyncResult -> {
                    String str = this.redisPrefix + this.consumersPrefix + key;
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues unregister consumers get: " + str);
                    }
                    this.redisClient.get(str, asyncResult -> {
                        String str2 = (String) asyncResult.result();
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("RedisQues unregister consumers get result: " + str2);
                        }
                        if (this.uid.equals(str2)) {
                            this.log.debug("RedisQues remove consumer: " + this.uid);
                            this.myQueues.remove(key);
                        }
                    });
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetConsumers() {
        this.log.debug("RedisQues Resetting consumers");
        String str = this.redisPrefix + this.consumersPrefix + "*";
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues reset consumers keys: " + str);
        }
        this.redisClient.keys(str, asyncResult -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void consume(String str) {
        this.log.debug(" RedisQues Requested to consume queue " + str);
        refreshRegistration(str, asyncResult -> {
            String str2 = this.redisPrefix + this.consumersPrefix + str;
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues consume get: " + str2);
            }
            this.redisClient.get(str2, asyncResult -> {
                String str3 = (String) asyncResult.result();
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues refresh registration consumer: " + str3);
                }
                if (!this.uid.equals(str3)) {
                    this.log.warn("Registration for queue " + str + " has changed to " + str3);
                    this.myQueues.remove(str);
                    notifyConsumer(str);
                    return;
                }
                QueueState queueState = this.myQueues.get(str);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues consumer: " + str3 + " queue: " + str + " state: " + queueState);
                }
                if (queueState == QueueState.CONSUMING) {
                    this.log.debug("RedisQues Queue " + str + " is already beeing consumed");
                    return;
                }
                this.myQueues.put(str, QueueState.CONSUMING);
                if (queueState == null) {
                    this.log.warn("Received request to consume from a queue I did not know about: " + str);
                }
                this.log.debug("RedisQues Starting to consume queue " + str);
                readQueue(str);
            });
        });
    }

    private void readQueue(String str) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues read queue: " + str);
        }
        String str2 = this.queuesPrefix + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues read queue lindex: " + str2);
        }
        this.redisClient.lindex(str2, 0, asyncResult -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues read queue lindex result: " + ((String) asyncResult.result()));
            }
            if (asyncResult.result() != null) {
                processMessageWithTimeout(str, (String) asyncResult.result(), sendResult -> {
                    if (!sendResult.success.booleanValue()) {
                        this.log.debug("RedisQues Processing failed for queue " + str);
                        this.myQueues.put(str, QueueState.READY);
                        this.vertx.cancelTimer(sendResult.timeoutId.longValue());
                    } else {
                        String str3 = this.queuesPrefix + str;
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("RedisQues read queue lpop: " + str3);
                        }
                        this.redisClient.lpop(str3, asyncResult -> {
                            this.log.debug("RedisQues Message removed, queue " + str + " is ready again");
                            this.myQueues.put(str, QueueState.READY);
                            this.vertx.cancelTimer(sendResult.timeoutId.longValue());
                            if (this.stoppedHandler != null) {
                                unregisterConsumers(false);
                                if (this.myQueues.isEmpty()) {
                                    this.stoppedHandler.handle((Object) null);
                                }
                            }
                            String str4 = this.queuesPrefix + str;
                            if (this.log.isTraceEnabled()) {
                                this.log.trace("RedisQues read queue: " + str2);
                            }
                            this.redisClient.llen(str4, asyncResult -> {
                                if (((Long) asyncResult.result()).longValue() > 0) {
                                    notifyConsumer(str);
                                }
                            });
                        });
                    }
                });
            } else {
                this.log.debug("Got a request to consume from empty queue " + str);
                this.myQueues.put(str, QueueState.READY);
            }
        });
    }

    private void processMessageWithTimeout(String str, String str2, final Handler<SendResult> handler) {
        EventBus eventBus = this.vertx.eventBus();
        JsonObject jsonObject = new JsonObject();
        jsonObject.put("queue", str);
        jsonObject.put(PAYLOAD, str2);
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues process message: " + jsonObject + " for queue: " + str + " send it to processor: " + this.processorAddress);
        }
        final long timer = this.vertx.setTimer(this.processorTimeout, l -> {
            this.log.debug("RedisQues QUEUE_ERROR: Consumer timeout " + this.uid + " queue: " + str);
            handler.handle(new SendResult(false, l));
        });
        eventBus.send(this.processorAddress, jsonObject, new Handler<AsyncResult<Message<JsonObject>>>() { // from class: org.swisspush.redisques.RedisQues.3
            public void handle(AsyncResult<Message<JsonObject>> asyncResult) {
                handler.handle(new SendResult(Boolean.valueOf(((JsonObject) ((Message) asyncResult.result()).body()).getString(RedisQues.STATUS).equals(RedisQues.OK)), Long.valueOf(timer)));
            }
        });
        updateTimestamp(str, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyConsumer(String str) {
        this.log.debug("RedisQues Notifying consumer of queue " + str);
        EventBus eventBus = this.vertx.eventBus();
        String str2 = this.redisPrefix + this.consumersPrefix + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues notify consumer get: " + str2);
        }
        this.redisClient.get(str2, asyncResult -> {
            String str3 = (String) asyncResult.result();
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues got consumer: " + str3);
            }
            if (str3 == null) {
                this.log.debug("RedisQues Sending registration request for queue " + str);
                eventBus.send(this.address + "-consumers", str);
            } else {
                this.log.debug("RedisQues Notifying consumer " + str3 + " to consume queue " + str);
                eventBus.send(str3, str);
            }
        });
    }

    private void refreshRegistration(String str, Handler<AsyncResult<Long>> handler) {
        this.log.debug("RedisQues Refreshing registration of queue " + str + ", expire at " + (2 * this.refreshPeriod));
        String str2 = this.redisPrefix + this.consumersPrefix + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues refresh registration: " + str2);
        }
        if (handler != null) {
            this.redisClient.expire(str2, 2 * this.refreshPeriod, handler);
        } else {
            this.redisClient.expire(str2, 2 * this.refreshPeriod, asyncResult -> {
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateTimestamp(String str, Handler<AsyncResult<Long>> handler) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues update timestamp for queue: " + str + " to: " + System.currentTimeMillis());
        }
        if (handler != null) {
            this.redisClient.zadd(this.redisPrefix + "queues", System.currentTimeMillis(), str, handler);
        } else {
            this.redisClient.zadd(this.redisPrefix + "queues", System.currentTimeMillis(), str, asyncResult -> {
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkQueues() {
        this.log.debug("Checking queues timestamps");
        long currentTimeMillis = System.currentTimeMillis() - ((3 * this.refreshPeriod) * 1000);
        this.redisClient.zrangebyscore(this.redisPrefix + "queues", "-inf", String.valueOf(currentTimeMillis), RangeLimitOptions.NONE, asyncResult -> {
            JsonArray jsonArray = (JsonArray) asyncResult.result();
            AtomicInteger atomicInteger = new AtomicInteger(jsonArray.size());
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues update queues: " + atomicInteger);
            }
            Iterator it = jsonArray.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                String str2 = this.queuesPrefix + str;
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues update queue: " + str2);
                }
                this.redisClient.exists(str2, asyncResult -> {
                    if (((Long) asyncResult.result()).longValue() == 1) {
                        this.log.debug("Updating queue timestamp " + str);
                        updateTimestamp(str, asyncResult -> {
                            if (atomicInteger.decrementAndGet() == 0) {
                                removeOldQueues(currentTimeMillis);
                            }
                        });
                        refreshRegistration(str, null);
                        notifyConsumer(str);
                        return;
                    }
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues remove old queue: " + str);
                    }
                    if (atomicInteger.decrementAndGet() == 0) {
                        removeOldQueues(currentTimeMillis);
                    }
                });
            }
        });
    }

    private void removeOldQueues(long j) {
        this.log.debug("Cleaning old queues");
        this.redisClient.zremrangebyscore(this.redisPrefix + "queues", "-inf", String.valueOf(j), asyncResult -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getMaxQueueItemCountIndex(String str) {
        int i = DEFAULT_MAX_QUEUEITEM_COUNT;
        if (str != null) {
            try {
                int parseInt = Integer.parseInt(str) - 1;
                if (parseInt >= 0) {
                    i = parseInt;
                }
                this.log.info("use limit parameter " + parseInt);
            } catch (NumberFormatException e) {
                this.log.warn("Invalid limit parameter '" + str + "' configured for max queue item count. Using default " + DEFAULT_MAX_QUEUEITEM_COUNT);
            }
        }
        return i;
    }
}
