package org.swisspush.redisques;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;
import io.vertx.redis.op.RangeLimitOptions;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.swisspush.redisques.handler.AddQueueItemHandler;
import org.swisspush.redisques.handler.DeleteLockHandler;
import org.swisspush.redisques.handler.GetAllLocksHandler;
import org.swisspush.redisques.handler.GetLockHandler;
import org.swisspush.redisques.handler.GetQueueItemHandler;
import org.swisspush.redisques.handler.GetQueueItemsCountHandler;
import org.swisspush.redisques.handler.GetQueueItemsHandler;
import org.swisspush.redisques.handler.GetQueuesCountHandler;
import org.swisspush.redisques.handler.GetQueuesHandler;
import org.swisspush.redisques.handler.PutLockHandler;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.handler.ReplaceQueueItemHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.RedisquesAPI;
import org.swisspush.redisques.util.RedisquesConfiguration;

/* loaded from: input_file:org/swisspush/redisques/RedisQues.class */
public class RedisQues extends AbstractVerticle {
    private MessageConsumer<String> uidMessageConsumer;
    private MessageConsumer<String> conumersMessageConsumer;
    private RedisClient redisClient;
    public static final String TIMESTAMP = "timestamp";
    private int checkInterval;
    private static final int DEFAULT_MAX_QUEUEITEM_COUNT = 49;
    private static final int MAX_AGE_MILLISECONDS = 120000;
    private LuaScriptManager luaScriptManager;
    private String uid = UUID.randomUUID().toString();
    private Map<String, QueueState> myQueues = new HashMap();
    private Logger log = LoggerFactory.getLogger(RedisQues.class);
    private Handler<Void> stoppedHandler = null;
    private String address = "redisques";
    private String redisPrefix = "redisques:";
    private String processorAddress = "redisques-processor";
    private int refreshPeriod = 10;
    private int processorTimeout = 240000;
    private Handler<Message<String>> registrationRequestHandler = message -> {
        String str = (String) message.body();
        this.log.debug("RedisQues Got registration request for queue " + str + " from consumer: " + this.uid);
        this.redisClient.setnx(getConsumersPrefix() + str, this.uid, asyncResult -> {
            long longValue = ((Long) asyncResult.result()).longValue();
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues setxn result: " + longValue + " for queue: " + str);
            }
            if (longValue != 1) {
                this.log.debug("RedisQues Missed registration for queue " + str);
                return;
            }
            this.log.debug("RedisQues Now registered for queue " + str);
            this.myQueues.put(str, QueueState.READY);
            consume(str);
        });
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$QueueState.class */
    public enum QueueState {
        READY,
        CONSUMING
    }

    /* loaded from: input_file:org/swisspush/redisques/RedisQues$SendResult.class */
    private class SendResult {
        public final Boolean success;
        public final Long timeoutId;

        public SendResult(Boolean bool, Long l) {
            this.success = bool;
            this.timeoutId = l;
        }
    }

    private String getQueuesPrefix() {
        return this.redisPrefix + "queues:";
    }

    private String getQueuesKey() {
        return this.redisPrefix + "queues";
    }

    public String getConsumersPrefix() {
        return this.redisPrefix + "consumers:";
    }

    private String getLocksKey() {
        return this.redisPrefix + "locks";
    }

    private String getQueueCheckLastexecKey() {
        return this.redisPrefix + "check:lastexec";
    }

    public void start() {
        EventBus eventBus = this.vertx.eventBus();
        this.log.info("Started with UID " + this.uid);
        RedisquesConfiguration fromJsonObject = RedisquesConfiguration.fromJsonObject(config());
        this.log.info("Starting Redisques module with configuration: " + fromJsonObject);
        this.address = fromJsonObject.getAddress();
        this.redisPrefix = fromJsonObject.getRedisPrefix();
        this.processorAddress = fromJsonObject.getProcessorAddress();
        this.refreshPeriod = fromJsonObject.getRefreshPeriod();
        this.checkInterval = fromJsonObject.getCheckInterval();
        this.processorTimeout = fromJsonObject.getProcessorTimeout();
        this.redisClient = RedisClient.create(this.vertx, new RedisOptions().setHost(fromJsonObject.getRedisHost()).setPort(fromJsonObject.getRedisPort()).setEncoding(fromJsonObject.getRedisEncoding()));
        this.luaScriptManager = new LuaScriptManager(this.redisClient);
        RedisquesHttpRequestHandler.init(this.vertx, fromJsonObject);
        eventBus.localConsumer(this.address, message -> {
            String string = ((JsonObject) message.body()).getString(RedisquesAPI.OPERATION);
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues got operation:" + string);
            }
            RedisquesAPI.QueueOperation fromString = RedisquesAPI.QueueOperation.fromString(string);
            if (fromString == null) {
                unsupportedOperation(string, message);
                return;
            }
            switch (fromString) {
                case enqueue:
                    enqueue(message);
                    return;
                case lockedEnqueue:
                    lockedEnqueue(message);
                    return;
                case getQueueItems:
                    getQueueItems(message);
                    return;
                case addQueueItem:
                    addQueueItem(message);
                    return;
                case deleteQueueItem:
                    deleteQueueItem(message);
                    return;
                case getQueueItem:
                    getQueueItem(message);
                    return;
                case replaceQueueItem:
                    replaceQueueItem(message);
                    return;
                case deleteAllQueueItems:
                    deleteAllQueueItems(message);
                    return;
                case getAllLocks:
                    this.redisClient.hkeys(getLocksKey(), new GetAllLocksHandler(message));
                    return;
                case putLock:
                    putLock(message);
                    return;
                case getLock:
                    this.redisClient.hget(getLocksKey(), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), new GetLockHandler(message));
                    return;
                case deleteLock:
                    deleteLock(message);
                    return;
                case getQueueItemsCount:
                    this.redisClient.llen(getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), new GetQueueItemsCountHandler(message));
                    return;
                case getQueuesCount:
                    this.redisClient.zcount(getQueuesKey(), getMaxAgeTimestamp(), Double.MAX_VALUE, new GetQueuesCountHandler(message));
                    return;
                case getQueues:
                    this.redisClient.zrangebyscore(getQueuesKey(), String.valueOf(getMaxAgeTimestamp()), "+inf", RangeLimitOptions.NONE, new GetQueuesHandler(message));
                    return;
                case check:
                    checkQueues();
                    return;
                case reset:
                    resetConsumers();
                    return;
                case stop:
                    gracefulStop(r4 -> {
                        new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK);
                    });
                    return;
                default:
                    unsupportedOperation(string, message);
                    return;
            }
        });
        this.conumersMessageConsumer = eventBus.consumer(this.address + "-consumers", this.registrationRequestHandler);
        this.uidMessageConsumer = eventBus.consumer(this.uid, message2 -> {
            String str = (String) message2.body();
            this.log.debug("RedisQues Got notification for queue " + str);
            consume(str);
        });
        this.vertx.setPeriodic(this.refreshPeriod * 1000, l -> {
            this.myQueues.entrySet().stream().filter(entry -> {
                return entry.getValue() == QueueState.CONSUMING;
            }).forEach(entry2 -> {
                String str = (String) entry2.getKey();
                String str2 = getConsumersPrefix() + str;
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues refresh queues get: " + str2);
                }
                this.redisClient.get(str2, asyncResult -> {
                    if (!this.uid.equals((String) asyncResult.result())) {
                        this.log.debug("RedisQues Removing queue " + str + " from the list");
                        this.myQueues.remove(str);
                    } else {
                        this.log.debug("RedisQues Periodic consumer refresh for active queue " + str);
                        refreshRegistration(str, null);
                        updateTimestamp(str, null);
                    }
                });
            });
        });
        registerQueueCheck(fromJsonObject);
    }

    private void enqueue(Message<JsonObject> message) {
        updateTimestamp(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), null);
        this.redisClient.rpush(getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), ((JsonObject) message.body()).getString(RedisquesAPI.MESSAGE), asyncResult -> {
            JsonObject jsonObject = new JsonObject();
            if (asyncResult.succeeded()) {
                this.log.debug("RedisQues Enqueued message into queue " + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME));
                notifyConsumer(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME));
                jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.OK);
                jsonObject.put(RedisquesAPI.MESSAGE, "enqueued");
                message.reply(jsonObject);
                return;
            }
            String str = "RedisQues QUEUE_ERROR: Error while enqueueing message into queue " + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME);
            this.log.error(str);
            jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.ERROR);
            jsonObject.put(RedisquesAPI.MESSAGE, str);
            message.reply(jsonObject);
        });
    }

    private void lockedEnqueue(Message<JsonObject> message) {
        this.log.debug("RedisQues about to lockedEnqueue");
        JsonObject extractLockInfo = extractLockInfo(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.REQUESTED_BY));
        if (extractLockInfo != null) {
            this.redisClient.hmset(getLocksKey(), new JsonObject().put(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), extractLockInfo.encode()), asyncResult -> {
                if (asyncResult.succeeded()) {
                    this.log.debug("RedisQues lockedEnqueue locking successful, now going to enqueue");
                    enqueue(message);
                } else {
                    this.log.warn("RedisQues lockedEnqueue locking failed. Skip enqueue");
                    message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR));
                }
            });
        } else {
            this.log.warn("RedisQues lockedEnqueue failed because property 'requestedBy' was missing");
            message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR).put(RedisquesAPI.MESSAGE, "Property 'requestedBy' missing"));
        }
    }

    private void addQueueItem(Message<JsonObject> message) {
        this.redisClient.rpush(getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.BUFFER), new AddQueueItemHandler(message));
    }

    private void getQueueItems(Message<JsonObject> message) {
        String str = getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME);
        int maxQueueItemCountIndex = getMaxQueueItemCountIndex(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.LIMIT));
        this.redisClient.llen(str, asyncResult -> {
            this.redisClient.lrange(str, 0L, maxQueueItemCountIndex, new GetQueueItemsHandler(message, (Long) asyncResult.result()));
        });
    }

    private void getQueueItem(Message<JsonObject> message) {
        this.redisClient.lindex(getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getInteger(RedisquesAPI.INDEX).intValue(), new GetQueueItemHandler(message));
    }

    private void replaceQueueItem(Message<JsonObject> message) {
        this.redisClient.lset(getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getInteger(RedisquesAPI.INDEX).intValue(), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.BUFFER), new ReplaceQueueItemHandler(message));
    }

    private void deleteQueueItem(Message<JsonObject> message) {
        this.redisClient.lset(getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getInteger(RedisquesAPI.INDEX).intValue(), "TO_DELETE", asyncResult -> {
            if (!asyncResult.succeeded()) {
                message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR));
            } else {
                this.redisClient.lrem(getQueuesPrefix() + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), 0L, "TO_DELETE", asyncResult -> {
                    message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK));
                });
            }
        });
    }

    private void deleteAllQueueItems(Message<JsonObject> message) {
        JsonObject jsonObject = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD);
        boolean booleanValue = jsonObject.getBoolean(RedisquesAPI.UNLOCK, false).booleanValue();
        String string = jsonObject.getString(RedisquesAPI.QUEUENAME);
        this.redisClient.del(getQueuesPrefix() + string, asyncResult -> {
            if (booleanValue) {
                this.redisClient.hdel(getLocksKey(), string, asyncResult -> {
                    replyDeleteAllQueueItems(message, asyncResult);
                });
            } else {
                replyDeleteAllQueueItems(message, asyncResult);
            }
        });
    }

    private void replyDeleteAllQueueItems(Message<JsonObject> message, AsyncResult<Long> asyncResult) {
        if (((Long) asyncResult.result()).longValue() > 0) {
            message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK));
        } else {
            message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR));
        }
    }

    private void putLock(Message<JsonObject> message) {
        JsonObject extractLockInfo = extractLockInfo(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.REQUESTED_BY));
        if (extractLockInfo != null) {
            this.redisClient.hmset(getLocksKey(), new JsonObject().put(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), extractLockInfo.encode()), new PutLockHandler(message));
        } else {
            message.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR).put(RedisquesAPI.MESSAGE, "Property 'requestedBy' missing"));
        }
    }

    private void deleteLock(Message<JsonObject> message) {
        String string = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME);
        this.redisClient.exists(getQueuesPrefix() + string, asyncResult -> {
            if (asyncResult.succeeded() && ((Long) asyncResult.result()).longValue() == 1) {
                notifyConsumer(string);
            }
            this.redisClient.hdel(getLocksKey(), string, new DeleteLockHandler(message));
        });
    }

    private void registerQueueCheck(RedisquesConfiguration redisquesConfiguration) {
        this.vertx.setPeriodic(redisquesConfiguration.getCheckIntervalTimerMs(), l -> {
            this.luaScriptManager.handleQueueCheck(getQueueCheckLastexecKey(), this.checkInterval, bool -> {
                if (bool.booleanValue()) {
                    this.log.info("periodic queue check is triggered now");
                    checkQueues();
                }
            });
        });
    }

    private long getMaxAgeTimestamp() {
        return System.currentTimeMillis() - 120000;
    }

    private void unsupportedOperation(String str, Message<JsonObject> message) {
        JsonObject jsonObject = new JsonObject();
        String str2 = "QUEUE_ERROR: Unsupported operation received: " + str;
        this.log.error(str2);
        jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.ERROR);
        jsonObject.put(RedisquesAPI.MESSAGE, str2);
        message.reply(jsonObject);
    }

    private JsonObject extractLockInfo(String str) {
        if (str == null) {
            return null;
        }
        JsonObject jsonObject = new JsonObject();
        jsonObject.put(RedisquesAPI.REQUESTED_BY, str);
        jsonObject.put(TIMESTAMP, Long.valueOf(System.currentTimeMillis()));
        return jsonObject;
    }

    public void stop() {
        unregisterConsumers(true);
    }

    private void gracefulStop(Handler<Void> handler) {
        this.conumersMessageConsumer.unregister(asyncResult -> {
            this.uidMessageConsumer.unregister(asyncResult -> {
                unregisterConsumers(false);
                this.stoppedHandler = handler;
                if (this.myQueues.keySet().isEmpty()) {
                    handler.handle((Object) null);
                }
            });
        });
    }

    private void unregisterConsumers(boolean z) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues unregister consumers force: " + z);
        }
        this.log.debug("RedisQues Unregistering consumers");
        for (Map.Entry<String, QueueState> entry : this.myQueues.entrySet()) {
            String key = entry.getKey();
            if (z || entry.getValue() == QueueState.READY) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues unregister consumers queue: " + key);
                }
                refreshRegistration(key, asyncResult -> {
                    String str = getConsumersPrefix() + key;
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues unregister consumers get: " + str);
                    }
                    this.redisClient.get(str, asyncResult -> {
                        String str2 = (String) asyncResult.result();
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("RedisQues unregister consumers get result: " + str2);
                        }
                        if (this.uid.equals(str2)) {
                            this.log.debug("RedisQues remove consumer: " + this.uid);
                            this.myQueues.remove(key);
                        }
                    });
                });
            }
        }
    }

    private void resetConsumers() {
        this.log.debug("RedisQues Resetting consumers");
        String str = getConsumersPrefix() + "*";
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues reset consumers keys: " + str);
        }
        this.redisClient.keys(str, asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("Unable to get redis keys of consumers");
                return;
            }
            List list = ((JsonArray) asyncResult.result()).getList();
            if (list == null || list.size() < 1) {
                this.log.debug("No consumers found to reset");
            } else {
                this.redisClient.delMany(list, asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        this.log.error("Unable to delete redis keys of consumers");
                    } else {
                        this.log.debug("Successfully reset " + ((Long) asyncResult.result()) + " consumers");
                    }
                });
            }
        });
    }

    private void consume(String str) {
        this.log.debug(" RedisQues Requested to consume queue " + str);
        refreshRegistration(str, asyncResult -> {
            String str2 = getConsumersPrefix() + str;
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues consume get: " + str2);
            }
            this.redisClient.get(str2, asyncResult -> {
                String str3 = (String) asyncResult.result();
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues refresh registration consumer: " + str3);
                }
                if (!this.uid.equals(str3)) {
                    this.log.warn("Registration for queue " + str + " has changed to " + str3);
                    this.myQueues.remove(str);
                    notifyConsumer(str);
                    return;
                }
                QueueState queueState = this.myQueues.get(str);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues consumer: " + str3 + " queue: " + str + " state: " + queueState);
                }
                if (queueState == QueueState.CONSUMING) {
                    this.log.debug("RedisQues Queue " + str + " is already beeing consumed");
                    return;
                }
                this.myQueues.put(str, QueueState.CONSUMING);
                if (queueState == null) {
                    this.log.warn("Received request to consume from a queue I did not know about: " + str);
                }
                this.log.debug("RedisQues Starting to consume queue " + str);
                readQueue(str);
            });
        });
    }

    private Future<Boolean> isQueueLocked(String str) {
        Future<Boolean> future = Future.future();
        this.redisClient.hexists(getLocksKey(), str, asyncResult -> {
            if (!asyncResult.failed()) {
                future.complete(Boolean.valueOf(((Long) asyncResult.result()).longValue() == 1));
            } else {
                this.log.warn("failed to check if queue '" + str + "' is locked. Message: " + asyncResult.cause().getMessage());
                future.complete(Boolean.FALSE);
            }
        });
        return future;
    }

    private void readQueue(String str) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues read queue: " + str);
        }
        String str2 = getQueuesPrefix() + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues read queue lindex: " + str2);
        }
        isQueueLocked(str).setHandler(asyncResult -> {
            if (!((Boolean) asyncResult.result()).booleanValue()) {
                this.redisClient.lindex(str2, 0, asyncResult -> {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues read queue lindex result: " + ((String) asyncResult.result()));
                    }
                    if (asyncResult.result() != null) {
                        processMessageWithTimeout(str, (String) asyncResult.result(), sendResult -> {
                            if (sendResult.success.booleanValue()) {
                                String str3 = getQueuesPrefix() + str;
                                if (this.log.isTraceEnabled()) {
                                    this.log.trace("RedisQues read queue lpop: " + str3);
                                }
                                this.redisClient.lpop(str3, asyncResult -> {
                                    this.log.debug("RedisQues Message removed, queue " + str + " is ready again");
                                    this.myQueues.put(str, QueueState.READY);
                                    this.vertx.cancelTimer(sendResult.timeoutId.longValue());
                                    if (this.stoppedHandler != null) {
                                        unregisterConsumers(false);
                                        if (this.myQueues.isEmpty()) {
                                            this.stoppedHandler.handle((Object) null);
                                        }
                                    }
                                    String str4 = getQueuesPrefix() + str;
                                    if (this.log.isTraceEnabled()) {
                                        this.log.trace("RedisQues read queue: " + str2);
                                    }
                                    this.redisClient.llen(str4, asyncResult -> {
                                        if (((Long) asyncResult.result()).longValue() > 0) {
                                            notifyConsumer(str);
                                        }
                                    });
                                });
                                return;
                            }
                            this.log.debug("RedisQues Processing failed for queue " + str);
                            this.myQueues.put(str, QueueState.READY);
                            this.vertx.cancelTimer(sendResult.timeoutId.longValue());
                            rescheduleSendMessageAfterFailure(str);
                        });
                    } else {
                        this.log.debug("Got a request to consume from empty queue " + str);
                        this.myQueues.put(str, QueueState.READY);
                    }
                });
            } else {
                this.log.debug("Got a request to consume from locked queue " + str);
                this.myQueues.put(str, QueueState.READY);
            }
        });
    }

    private void rescheduleSendMessageAfterFailure(String str) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedsQues reschedule after failure for queue: " + str);
        }
        this.vertx.setTimer(this.refreshPeriod * 1000, l -> {
            notifyConsumer(str);
        });
    }

    private void processMessageWithTimeout(String str, String str2, Handler<SendResult> handler) {
        EventBus eventBus = this.vertx.eventBus();
        JsonObject jsonObject = new JsonObject();
        jsonObject.put("queue", str);
        jsonObject.put(RedisquesAPI.PAYLOAD, str2);
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues process message: " + jsonObject + " for queue: " + str + " send it to processor: " + this.processorAddress);
        }
        long timer = this.vertx.setTimer(this.processorTimeout, l -> {
            this.log.info("RedisQues QUEUE_ERROR: Consumer timeout " + this.uid + " queue: " + str);
            handler.handle(new SendResult(false, l));
        });
        eventBus.send(this.processorAddress, jsonObject, asyncResult -> {
            handler.handle(new SendResult(asyncResult.succeeded() ? Boolean.valueOf(RedisquesAPI.OK.equals(((JsonObject) ((Message) asyncResult.result()).body()).getString(RedisquesAPI.STATUS))) : Boolean.FALSE, Long.valueOf(timer)));
        });
        updateTimestamp(str, null);
    }

    private void notifyConsumer(String str) {
        this.log.debug("RedisQues Notifying consumer of queue " + str);
        EventBus eventBus = this.vertx.eventBus();
        String str2 = getConsumersPrefix() + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues notify consumer get: " + str2);
        }
        this.redisClient.get(str2, asyncResult -> {
            String str3 = (String) asyncResult.result();
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues got consumer: " + str3);
            }
            if (str3 == null) {
                this.log.debug("RedisQues Sending registration request for queue " + str);
                eventBus.send(this.address + "-consumers", str);
            } else {
                this.log.debug("RedisQues Notifying consumer " + str3 + " to consume queue " + str);
                eventBus.send(str3, str);
            }
        });
    }

    private void refreshRegistration(String str, Handler<AsyncResult<Long>> handler) {
        this.log.debug("RedisQues Refreshing registration of queue " + str + ", expire at " + (2 * this.refreshPeriod));
        String str2 = getConsumersPrefix() + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues refresh registration: " + str2);
        }
        if (handler != null) {
            this.redisClient.expire(str2, 2 * this.refreshPeriod, handler);
        } else {
            this.redisClient.expire(str2, 2 * this.refreshPeriod, asyncResult -> {
            });
        }
    }

    private void updateTimestamp(String str, Handler<AsyncResult<Long>> handler) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues update timestamp for queue: " + str + " to: " + currentTimeMillis);
        }
        if (handler != null) {
            this.redisClient.zadd(getQueuesKey(), currentTimeMillis, str, handler);
        } else {
            this.redisClient.zadd(getQueuesKey(), currentTimeMillis, str, asyncResult -> {
            });
        }
    }

    private void checkQueues() {
        this.log.debug("Checking queues timestamps");
        long currentTimeMillis = System.currentTimeMillis() - ((3 * this.refreshPeriod) * 1000);
        this.redisClient.zrangebyscore(getQueuesKey(), "-inf", String.valueOf(currentTimeMillis), RangeLimitOptions.NONE, asyncResult -> {
            JsonArray jsonArray = (JsonArray) asyncResult.result();
            AtomicInteger atomicInteger = new AtomicInteger(jsonArray.size());
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues update queues: " + atomicInteger);
            }
            Iterator it = jsonArray.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                String str2 = getQueuesPrefix() + str;
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues update queue: " + str2);
                }
                this.redisClient.exists(str2, asyncResult -> {
                    if (((Long) asyncResult.result()).longValue() == 1) {
                        this.log.debug("Updating queue timestamp " + str);
                        updateTimestamp(str, asyncResult -> {
                            if (atomicInteger.decrementAndGet() == 0) {
                                removeOldQueues(currentTimeMillis);
                            }
                        });
                        refreshRegistration(str, null);
                        notifyConsumer(str);
                        return;
                    }
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues remove old queue: " + str);
                    }
                    if (atomicInteger.decrementAndGet() == 0) {
                        removeOldQueues(currentTimeMillis);
                    }
                });
            }
        });
    }

    private void removeOldQueues(long j) {
        this.log.debug("Cleaning old queues");
        this.redisClient.zremrangebyscore(getQueuesKey(), "-inf", String.valueOf(j), asyncResult -> {
        });
    }

    private int getMaxQueueItemCountIndex(String str) {
        int i = DEFAULT_MAX_QUEUEITEM_COUNT;
        if (str != null) {
            try {
                int parseInt = Integer.parseInt(str) - 1;
                if (parseInt >= 0) {
                    i = parseInt;
                }
                this.log.info("use limit parameter " + parseInt);
            } catch (NumberFormatException e) {
                this.log.warn("Invalid limit parameter '" + str + "' configured for max queue item count. Using default " + DEFAULT_MAX_QUEUEITEM_COUNT);
            }
        }
        return i;
    }
}
