package org.swisspush.redisques;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.RedisClient;
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.SimpleStringType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.handler.AddQueueItemHandler;
import org.swisspush.redisques.handler.DeleteLockHandler;
import org.swisspush.redisques.handler.GetAllLocksHandler;
import org.swisspush.redisques.handler.GetLockHandler;
import org.swisspush.redisques.handler.GetQueueItemHandler;
import org.swisspush.redisques.handler.GetQueueItemsCountHandler;
import org.swisspush.redisques.handler.GetQueueItemsHandler;
import org.swisspush.redisques.handler.GetQueuesCountHandler;
import org.swisspush.redisques.handler.GetQueuesHandler;
import org.swisspush.redisques.handler.GetQueuesItemsCountHandler;
import org.swisspush.redisques.handler.GetQueuesSpeedHandler;
import org.swisspush.redisques.handler.GetQueuesStatisticsHandler;
import org.swisspush.redisques.handler.PutLockHandler;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.handler.ReplaceQueueItemHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.MessageUtil;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisQuesTimer;
import org.swisspush.redisques.util.RedisUtils;
import org.swisspush.redisques.util.RedisquesAPI;
import org.swisspush.redisques.util.RedisquesConfiguration;
import org.swisspush.redisques.util.Result;

/* loaded from: input_file:org/swisspush/redisques/RedisQues.class */
public class RedisQues extends AbstractVerticle {
    private MessageConsumer<String> uidMessageConsumer;
    private QueueStatisticsCollector queueStatisticsCollector;
    private MessageConsumer<String> consumersMessageConsumer;
    private RedisClient redisClient;
    private RedisAPI redisAPI;
    private String redisPrefix;
    private String queuesKey;
    private String queuesPrefix;
    private String consumersPrefix;
    private String locksKey;
    private String queueCheckLastexecKey;
    public static final String TIMESTAMP = "timestamp";
    private int refreshPeriod;
    private int consumerLockTime;
    private int checkInterval;
    private long processorDelayMax;
    private RedisQuesTimer timer;
    private String redisHost;
    private int redisPort;
    private String redisAuth;
    private String redisEncoding;
    private int redisMaxPoolSize;
    private int redisMaxWaitSize;
    private boolean httpRequestHandlerEnabled;
    private String httpRequestHandlerPrefix;
    private int httpRequestHandlerPort;
    private String httpRequestHandlerUserHeader;
    private List<QueueConfiguration> queueConfigurations;
    private static final int DEFAULT_MAX_QUEUEITEM_COUNT = 49;
    private static final int MAX_AGE_MILLISECONDS = 120000;
    private static final Set<String> ALLOWED_CONFIGURATION_VALUES = (Set) Stream.of("processorDelayMax").collect(Collectors.toSet());
    private LuaScriptManager luaScriptManager;
    private final String uid = UUID.randomUUID().toString();
    private final Map<String, QueueState> myQueues = new HashMap();
    private final Logger log = LoggerFactory.getLogger(RedisQues.class);
    private Handler<Void> stoppedHandler = null;
    private String address = "redisques";
    private String configurationUpdatedAddress = "redisques-configuration-updated";
    private String processorAddress = "redisques-processor";
    private int processorTimeout = 240000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$QueueState.class */
    public enum QueueState {
        READY,
        CONSUMING
    }

    private void redisSetWithOptions(String str, String str2, boolean z, int i, Handler<AsyncResult<Response>> handler) {
        JsonArray jsonArray = new JsonArray();
        jsonArray.add("EX").add(Integer.valueOf(i));
        if (z) {
            jsonArray.add("NX");
        }
        this.redisAPI.send(Command.SET, (String[]) RedisUtils.toPayload(str, str2, jsonArray).toArray(new String[0])).onComplete(handler);
    }

    private void handleRegistrationRequest(Message<String> message) {
        String str = (String) message.body();
        if (str == null) {
            this.log.warn("Got message without queue name while handleRegistrationRequest.");
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("RedisQues Got registration request for queue {} from consumer: {}", str, this.uid);
        }
        redisSetWithOptions(this.consumersPrefix + str, this.uid, true, this.consumerLockTime, asyncResult -> {
            if (!asyncResult.succeeded()) {
                this.log.error("redisSetWithOptions failed", asyncResult.cause());
                return;
            }
            String response = asyncResult.result() != null ? ((Response) asyncResult.result()).toString() : null;
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues setxn result: " + response + " for queue: " + str);
            }
            if (!"OK".equals(response)) {
                this.log.debug("RedisQues Missed registration for queue " + str);
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("RedisQues Now registered for queue " + str);
            }
            this.myQueues.put(str, QueueState.READY);
            consume(str);
        });
    }

    public void start(Promise<Void> promise) {
        this.vertx.eventBus();
        this.log.info("Started with UID " + this.uid);
        RedisquesConfiguration fromJsonObject = RedisquesConfiguration.fromJsonObject(config());
        this.log.info("Starting Redisques module with configuration: " + fromJsonObject);
        this.address = fromJsonObject.getAddress();
        this.configurationUpdatedAddress = fromJsonObject.getConfigurationUpdatedAddress();
        this.redisPrefix = fromJsonObject.getRedisPrefix();
        this.queuesKey = this.redisPrefix + "queues";
        this.queuesPrefix = this.redisPrefix + "queues:";
        this.consumersPrefix = this.redisPrefix + "consumers:";
        this.locksKey = this.redisPrefix + "locks";
        this.queueCheckLastexecKey = this.redisPrefix + "check:lastexec";
        this.processorAddress = fromJsonObject.getProcessorAddress();
        this.refreshPeriod = fromJsonObject.getRefreshPeriod();
        this.consumerLockTime = 2 * this.refreshPeriod;
        this.checkInterval = fromJsonObject.getCheckInterval();
        this.processorTimeout = fromJsonObject.getProcessorTimeout();
        this.processorDelayMax = fromJsonObject.getProcessorDelayMax();
        this.timer = new RedisQuesTimer(this.vertx);
        this.redisHost = fromJsonObject.getRedisHost();
        this.redisPort = fromJsonObject.getRedisPort();
        this.redisAuth = fromJsonObject.getRedisAuth();
        this.redisEncoding = fromJsonObject.getRedisEncoding();
        this.redisMaxPoolSize = fromJsonObject.getMaxPoolSize();
        this.redisMaxWaitSize = fromJsonObject.getMaxWaitSize();
        this.httpRequestHandlerEnabled = fromJsonObject.getHttpRequestHandlerEnabled();
        this.httpRequestHandlerPrefix = fromJsonObject.getHttpRequestHandlerPrefix();
        this.httpRequestHandlerPort = fromJsonObject.getHttpRequestHandlerPort().intValue();
        this.httpRequestHandlerUserHeader = fromJsonObject.getHttpRequestHandlerUserHeader();
        this.queueConfigurations = fromJsonObject.getQueueConfigurations();
        setupRedisAPI(this.redisHost, Integer.valueOf(this.redisPort), this.redisAuth, this.redisMaxPoolSize, this.redisMaxWaitSize).onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                promise.fail(asyncResult.cause());
                return;
            }
            this.redisAPI = (RedisAPI) asyncResult.result();
            initialize(fromJsonObject);
            promise.complete();
        });
    }

    private void initialize(RedisquesConfiguration redisquesConfiguration) {
        this.luaScriptManager = new LuaScriptManager(this.redisAPI);
        this.queueStatisticsCollector = new QueueStatisticsCollector(this.redisAPI, this.luaScriptManager, this.queuesPrefix, this.vertx, redisquesConfiguration.getQueueSpeedIntervalSec());
        RedisquesHttpRequestHandler.init(this.vertx, redisquesConfiguration);
        this.vertx.eventBus().consumer(this.configurationUpdatedAddress, message -> {
            this.log.info("Received configurations update");
            setConfigurationValues((JsonObject) message.body(), false);
        });
        this.vertx.eventBus().consumer(this.address, operationsHandler());
        this.consumersMessageConsumer = this.vertx.eventBus().consumer(this.address + "-consumers", this::handleRegistrationRequest);
        this.uidMessageConsumer = this.vertx.eventBus().consumer(this.uid, message2 -> {
            String str = (String) message2.body();
            if (str == null) {
                this.log.warn("Got event bus msg with empty body! uid={}  address={}  replyAddress={}", new Object[]{this.uid, message2.address(), message2.replyAddress()});
            }
            this.log.debug("RedisQues got notification for queue '{}'", str);
            consume(str);
        });
        registerActiveQueueRegistrationRefresh();
        registerQueueCheck(redisquesConfiguration);
    }

    private Future<RedisAPI> setupRedisAPI(String str, Integer num, String str2, int i, int i2) {
        Promise promise = Promise.promise();
        Redis.createClient(this.vertx, new RedisOptions().setConnectionString("redis://" + str + ":" + num).setPassword(str2 == null ? "" : str2).setMaxPoolSize(i).setMaxPoolWaiting(i2)).connect(asyncResult -> {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause());
            } else {
                promise.complete(RedisAPI.api((RedisConnection) asyncResult.result()));
            }
        });
        return promise.future();
    }

    private void registerActiveQueueRegistrationRefresh() {
        this.vertx.setPeriodic(this.refreshPeriod * 1000, l -> {
            this.myQueues.entrySet().stream().filter(entry -> {
                return entry.getValue() == QueueState.CONSUMING;
            }).forEach(entry2 -> {
                String str = (String) entry2.getKey();
                String str2 = this.consumersPrefix + str;
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues refresh queues get: " + str2);
                }
                this.redisAPI.get(str2, asyncResult -> {
                    if (asyncResult.failed()) {
                        this.log.warn("Failed to get queue consumer for queue '{}'. But we'll continue anyway :)", str, asyncResult.cause());
                    }
                    if (this.uid.equals(Objects.toString(asyncResult.result(), ""))) {
                        this.log.debug("RedisQues Periodic consumer refresh for active queue " + str);
                        refreshRegistration(str, null);
                        updateTimestamp(str, null);
                    } else {
                        this.log.debug("RedisQues Removing queue " + str + " from the list");
                        this.myQueues.remove(str);
                        this.queueStatisticsCollector.resetQueueFailureStatistics(str);
                    }
                });
            });
        });
    }

    private Handler<Message<JsonObject>> operationsHandler() {
        return message -> {
            JsonObject jsonObject = (JsonObject) message.body();
            if (null == jsonObject) {
                this.log.warn("Got msg with empty body from event bus. We'll run directly in a NullPointerException now. address={}  replyAddress={} ", message.address(), message.replyAddress());
            }
            String string = jsonObject.getString(RedisquesAPI.OPERATION);
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues got operation:" + string);
            }
            RedisquesAPI.QueueOperation fromString = RedisquesAPI.QueueOperation.fromString(string);
            if (fromString == null) {
                unsupportedOperation(string, message);
                return;
            }
            switch (fromString) {
                case enqueue:
                    enqueue(message);
                    return;
                case lockedEnqueue:
                    lockedEnqueue(message);
                    return;
                case getQueueItems:
                    getQueueItems(message);
                    return;
                case addQueueItem:
                    addQueueItem(message);
                    return;
                case deleteQueueItem:
                    deleteQueueItem(message);
                    return;
                case getQueueItem:
                    getQueueItem(message);
                    return;
                case replaceQueueItem:
                    replaceQueueItem(message);
                    return;
                case deleteAllQueueItems:
                    deleteAllQueueItems(message);
                    return;
                case bulkDeleteQueues:
                    bulkDeleteQueues(message);
                    return;
                case getAllLocks:
                    getAllLocks(message);
                    return;
                case putLock:
                    putLock(message);
                    return;
                case bulkPutLocks:
                    bulkPutLocks(message);
                    return;
                case getLock:
                    this.redisAPI.hget(this.locksKey, jsonObject.getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), new GetLockHandler(message));
                    return;
                case deleteLock:
                    deleteLock(message);
                    return;
                case bulkDeleteLocks:
                    bulkDeleteLocks(message);
                    return;
                case deleteAllLocks:
                    deleteAllLocks(message);
                    return;
                case getQueueItemsCount:
                    getQueueItemsCount(message);
                    return;
                case getQueuesItemsCount:
                    getQueuesItemsCount(message);
                    return;
                case getQueuesCount:
                    getQueuesCount(message);
                    return;
                case getQueues:
                    getQueues(message, false);
                    return;
                case check:
                    checkQueues();
                    return;
                case reset:
                    resetConsumers();
                    return;
                case stop:
                    gracefulStop(r4 -> {
                        new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK);
                    });
                    return;
                case getConfiguration:
                    getConfiguration(message);
                    return;
                case setConfiguration:
                    setConfiguration(message);
                    return;
                case getQueuesStatistics:
                    getQueuesStatistics(message);
                    return;
                case getQueuesSpeed:
                    getQueuesSpeed(message);
                    return;
                default:
                    unsupportedOperation(string, message);
                    return;
            }
        };
    }

    private void enqueue(Message<JsonObject> message) {
        String string = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME);
        updateTimestamp(string, null);
        this.redisAPI.rpush(Arrays.asList(this.queuesPrefix + string, ((JsonObject) message.body()).getString(RedisquesAPI.MESSAGE)), asyncResult -> {
            JsonObject jsonObject = new JsonObject();
            if (!asyncResult.succeeded()) {
                String str = "RedisQues QUEUE_ERROR: Error while enqueueing message into queue " + string;
                this.log.error(str, asyncResult.cause());
                jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.ERROR);
                jsonObject.put(RedisquesAPI.MESSAGE, str);
                message.reply(jsonObject);
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("RedisQues Enqueued message into queue " + string);
            }
            long longValue = ((Response) asyncResult.result()).toLong().longValue();
            notifyConsumer(string);
            jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.OK);
            jsonObject.put(RedisquesAPI.MESSAGE, "enqueued");
            long j = 0;
            QueueConfiguration findQueueConfiguration = findQueueConfiguration(string);
            if (findQueueConfiguration != null) {
                float enqueueDelayFactorMillis = findQueueConfiguration.getEnqueueDelayFactorMillis();
                if (enqueueDelayFactorMillis > 0.0f) {
                    j = ((float) (longValue - 1)) * enqueueDelayFactorMillis;
                    int enqueueMaxDelayMillis = findQueueConfiguration.getEnqueueMaxDelayMillis();
                    if (enqueueMaxDelayMillis > 0 && j > enqueueMaxDelayMillis) {
                        j = enqueueMaxDelayMillis;
                    }
                }
            }
            if (j > 0) {
                this.vertx.setTimer(j, l -> {
                    message.reply(jsonObject);
                });
            } else {
                message.reply(jsonObject);
            }
            this.queueStatisticsCollector.setQueueBackPressureTime(string, j);
        });
    }

    private void lockedEnqueue(Message<JsonObject> message) {
        this.log.debug("RedisQues about to lockedEnqueue");
        JsonObject extractLockInfo = extractLockInfo(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.REQUESTED_BY));
        if (extractLockInfo != null) {
            this.redisAPI.hmset(Arrays.asList(this.locksKey, ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), extractLockInfo.encode()), asyncResult -> {
                if (asyncResult.succeeded()) {
                    this.log.debug("RedisQues lockedEnqueue locking successful, now going to enqueue");
                    enqueue(message);
                } else {
                    this.log.warn("RedisQues lockedEnqueue locking failed. Skip enqueue");
                    message.reply(createErrorReply());
                }
            });
        } else {
            this.log.warn("RedisQues lockedEnqueue failed because property 'requestedBy' was missing");
            message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, "Property 'requestedBy' missing"));
        }
    }

    private void addQueueItem(Message<JsonObject> message) {
        this.redisAPI.rpush(Arrays.asList(this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.BUFFER)), new AddQueueItemHandler(message));
    }

    private void getQueueItems(Message<JsonObject> message) {
        String str = this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME);
        int maxQueueItemCountIndex = getMaxQueueItemCountIndex(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.LIMIT));
        this.redisAPI.llen(str, asyncResult -> {
            Long l = ((Response) asyncResult.result()).toLong();
            if (!asyncResult.succeeded() || l == null) {
                this.log.warn("Operation getQueueItems failed. But I'll not notify my caller :)", asyncResult.cause());
            } else {
                this.redisAPI.lrange(str, "0", String.valueOf(maxQueueItemCountIndex), new GetQueueItemsHandler(message, l));
            }
        });
    }

    private void getQueues(Message<JsonObject> message, boolean z) {
        getQueues(message, z, MessageUtil.extractFilterPattern(message));
    }

    private void getQueues(Message<JsonObject> message, boolean z, Result<Optional<Pattern>, String> result) {
        if (result.isErr()) {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, result.getErr()));
        } else {
            this.redisAPI.zrangebyscore(Arrays.asList(this.queuesKey, String.valueOf(getMaxAgeTimestamp()), "+inf"), new GetQueuesHandler(message, result.getOk(), z));
        }
    }

    private void getQueuesCount(Message<JsonObject> message) {
        Result<Optional<Pattern>, String> extractFilterPattern = MessageUtil.extractFilterPattern(message);
        if (extractFilterPattern.isErr()) {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, extractFilterPattern.getErr()));
        } else if (extractFilterPattern.getOk().isPresent()) {
            getQueues(message, true, extractFilterPattern);
        } else {
            this.redisAPI.zcount(this.queuesKey, String.valueOf(getMaxAgeTimestamp()), String.valueOf(Double.MAX_VALUE), new GetQueuesCountHandler(message));
        }
    }

    private void getQueueItem(Message<JsonObject> message) {
        this.redisAPI.lindex(this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), String.valueOf(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getInteger(RedisquesAPI.INDEX).intValue()), new GetQueueItemHandler(message));
    }

    private void replaceQueueItem(Message<JsonObject> message) {
        String str = this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME);
        int intValue = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getInteger(RedisquesAPI.INDEX).intValue();
        this.redisAPI.lset(str, String.valueOf(intValue), ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.BUFFER), new ReplaceQueueItemHandler(message));
    }

    private void deleteQueueItem(Message<JsonObject> message) {
        this.redisAPI.lset(this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), String.valueOf(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getInteger(RedisquesAPI.INDEX).intValue()), "TO_DELETE", asyncResult -> {
            if (asyncResult.succeeded()) {
                this.redisAPI.lrem(this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), "0", "TO_DELETE", asyncResult -> {
                    if (asyncResult.failed()) {
                        this.log.warn("Redis 'lrem' command failed. But will continue anyway.", asyncResult.cause());
                    }
                    message.reply(createOkReply());
                });
            } else {
                this.log.error("Failed to 'lset' while deleteQueueItem.", asyncResult.cause());
                message.reply(createErrorReply());
            }
        });
    }

    private void deleteAllQueueItems(Message<JsonObject> message) {
        JsonObject jsonObject = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD);
        boolean booleanValue = jsonObject.getBoolean(RedisquesAPI.UNLOCK, false).booleanValue();
        String string = jsonObject.getString(RedisquesAPI.QUEUENAME);
        this.redisAPI.del(Collections.singletonList(buildQueueKey(string)), asyncResult -> {
            if (asyncResult.failed()) {
                this.log.warn("Failed to deleteAllQueueItems. But we'll continue anyway", asyncResult.cause());
            }
            this.queueStatisticsCollector.resetQueueFailureStatistics(string);
            if (booleanValue) {
                this.redisAPI.hdel(Arrays.asList(this.locksKey, string), asyncResult -> {
                    if (asyncResult.failed()) {
                        this.log.warn("Failed to unlock queue '{}'. Will continue anyway", string, asyncResult.cause());
                    }
                    handleDeleteQueueReply(message, asyncResult);
                });
            } else {
                handleDeleteQueueReply(message, asyncResult);
            }
        });
    }

    int updateQueueFailureCountAndGetRetryInterval(String str, boolean z) {
        int[] retryIntervals;
        if (z) {
            this.queueStatisticsCollector.queueMessageSuccess(str);
            return 0;
        }
        long queueMessageFailed = this.queueStatisticsCollector.queueMessageFailed(str);
        QueueConfiguration findQueueConfiguration = findQueueConfiguration(str);
        if (findQueueConfiguration == null || (retryIntervals = findQueueConfiguration.getRetryIntervals()) == null || retryIntervals.length <= 0) {
            return this.refreshPeriod;
        }
        int i = retryIntervals[(int) (queueMessageFailed <= ((long) retryIntervals.length) ? queueMessageFailed - 1 : retryIntervals.length - 1)];
        this.queueStatisticsCollector.setQueueSlowDownTime(str, i);
        return i;
    }

    private String buildQueueKey(String str) {
        return this.queuesPrefix + str;
    }

    private List<String> buildQueueKeys(JsonArray jsonArray) {
        if (jsonArray == null) {
            return null;
        }
        int size = jsonArray.size();
        ArrayList arrayList = new ArrayList(size);
        for (int i = 0; i < size; i++) {
            arrayList.add(buildQueueKey(jsonArray.getString(i)));
        }
        return arrayList;
    }

    private void bulkDeleteQueues(Message<JsonObject> message) {
        JsonArray jsonArray = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getJsonArray(RedisquesAPI.QUEUES);
        if (jsonArray == null) {
            message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, "No queues to delete provided"));
            return;
        }
        if (jsonArray.isEmpty()) {
            message.reply(createOkReply().put(RedisquesAPI.VALUE, 0));
        } else if (jsonArrayContainsStringsOnly(jsonArray)) {
            this.redisAPI.del(buildQueueKeys(jsonArray), asyncResult -> {
                this.queueStatisticsCollector.resetQueueStatistics(jsonArray);
                if (asyncResult.succeeded()) {
                    message.reply(createOkReply().put(RedisquesAPI.VALUE, ((Response) asyncResult.result()).toLong()));
                } else {
                    this.log.error("Failed to bulkDeleteQueues", asyncResult.cause());
                    message.reply(createErrorReply());
                }
            });
        } else {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, "Queues must be string values"));
        }
    }

    private void handleDeleteQueueReply(Message<JsonObject> message, AsyncResult<Response> asyncResult) {
        if (asyncResult.succeeded()) {
            message.reply(createOkReply().put(RedisquesAPI.VALUE, ((Response) asyncResult.result()).toLong()));
        } else {
            this.log.error("Failed to replyResultGreaterThanZero", asyncResult.cause());
            message.reply(createErrorReply());
        }
    }

    private void getAllLocks(Message<JsonObject> message) {
        Result<Optional<Pattern>, String> extractFilterPattern = MessageUtil.extractFilterPattern(message);
        if (extractFilterPattern.isOk()) {
            this.redisAPI.hkeys(this.locksKey, new GetAllLocksHandler(message, extractFilterPattern.getOk()));
        } else {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, extractFilterPattern.getErr()));
        }
    }

    private void putLock(Message<JsonObject> message) {
        JsonObject extractLockInfo = extractLockInfo(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.REQUESTED_BY));
        if (extractLockInfo == null) {
            message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, "Property 'requestedBy' missing"));
            return;
        }
        JsonArray add = new JsonArray().add(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME));
        if (jsonArrayContainsStringsOnly(add)) {
            this.redisAPI.hmset(buildLocksItems(this.locksKey, add, extractLockInfo), new PutLockHandler(message));
        } else {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, "Lock must be a string value"));
        }
    }

    private void bulkPutLocks(Message<JsonObject> message) {
        JsonArray jsonArray = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getJsonArray(RedisquesAPI.LOCKS);
        if (jsonArray == null || jsonArray.isEmpty()) {
            message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, "No locks to put provided"));
            return;
        }
        JsonObject extractLockInfo = extractLockInfo(((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.REQUESTED_BY));
        if (extractLockInfo == null) {
            message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, "Property 'requestedBy' missing"));
        } else if (jsonArrayContainsStringsOnly(jsonArray)) {
            this.redisAPI.hmset(buildLocksItems(this.locksKey, jsonArray, extractLockInfo), new PutLockHandler(message));
        } else {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, "Locks must be string values"));
        }
    }

    private List<String> buildLocksItems(String str, JsonArray jsonArray, JsonObject jsonObject) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        String encode = jsonObject.encode();
        for (int i = 0; i < jsonArray.size(); i++) {
            arrayList.add(jsonArray.getString(i));
            arrayList.add(encode);
        }
        return arrayList;
    }

    private void deleteLock(Message<JsonObject> message) {
        String string = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME);
        this.redisAPI.exists(Collections.singletonList(this.queuesPrefix + string), asyncResult -> {
            if (asyncResult.succeeded() && asyncResult.result() != null && ((Response) asyncResult.result()).toInteger().intValue() == 1) {
                notifyConsumer(string);
            }
            this.redisAPI.hdel(Arrays.asList(this.locksKey, string), new DeleteLockHandler(message));
        });
    }

    private void bulkDeleteLocks(Message<JsonObject> message) {
        JsonArray jsonArray = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getJsonArray(RedisquesAPI.LOCKS);
        if (jsonArray == null) {
            message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, "No locks to delete provided"));
            return;
        }
        MultiType create = MultiType.create(jsonArray.size(), false);
        for (int i = 0; i < jsonArray.size(); i++) {
            create.add(SimpleStringType.create(jsonArray.getString(i)));
        }
        deleteLocks(message, create);
    }

    private void deleteAllLocks(Message<JsonObject> message) {
        this.redisAPI.hkeys(this.locksKey, asyncResult -> {
            if (asyncResult.succeeded()) {
                deleteLocks(message, (Response) asyncResult.result());
            } else {
                this.log.warn("failed to delete all locks. Message: " + asyncResult.cause().getMessage());
                message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, asyncResult.cause().getMessage()));
            }
        });
    }

    private void deleteLocks(Message<JsonObject> message, Response response) {
        if (response == null || response.size() == 0) {
            message.reply(createOkReply().put(RedisquesAPI.VALUE, 0));
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.locksKey);
        Iterator it = response.iterator();
        while (it.hasNext()) {
            arrayList.add(((Response) it.next()).toString());
        }
        this.redisAPI.hdel(arrayList, asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("Successfully deleted " + asyncResult.result() + " locks");
                message.reply(createOkReply().put(RedisquesAPI.VALUE, ((Response) asyncResult.result()).toLong()));
            } else {
                this.log.warn("failed to delete locks. Message: " + asyncResult.cause().getMessage());
                message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, asyncResult.cause().getMessage()));
            }
        });
    }

    private boolean jsonArrayContainsStringsOnly(JsonArray jsonArray) {
        Iterator it = jsonArray.iterator();
        while (it.hasNext()) {
            if (!(it.next() instanceof String)) {
                return false;
            }
        }
        return true;
    }

    private void getConfiguration(Message<JsonObject> message) {
        JsonObject jsonObject = new JsonObject();
        jsonObject.put(RedisquesConfiguration.PROP_ADDRESS, this.address);
        jsonObject.put(RedisquesConfiguration.PROP_REDIS_PREFIX, this.redisPrefix);
        jsonObject.put(RedisquesConfiguration.PROP_PROCESSOR_ADDRESS, this.processorAddress);
        jsonObject.put(RedisquesConfiguration.PROP_REFRESH_PERIOD, Integer.valueOf(this.refreshPeriod));
        jsonObject.put(RedisquesConfiguration.PROP_REDIS_HOST, this.redisHost);
        jsonObject.put(RedisquesConfiguration.PROP_REDIS_PORT, Integer.valueOf(this.redisPort));
        jsonObject.put(RedisquesConfiguration.PROP_REDIS_AUTH, this.redisAuth);
        jsonObject.put(RedisquesConfiguration.PROP_REDIS_ENCODING, this.redisEncoding);
        jsonObject.put(RedisquesConfiguration.PROP_CHECK_INTERVAL, Integer.valueOf(this.checkInterval));
        jsonObject.put(RedisquesConfiguration.PROP_PROCESSOR_TIMEOUT, Integer.valueOf(this.processorTimeout));
        jsonObject.put("processorDelayMax", Long.valueOf(this.processorDelayMax));
        jsonObject.put(RedisquesConfiguration.PROP_HTTP_REQUEST_HANDLER_ENABLED, Boolean.valueOf(this.httpRequestHandlerEnabled));
        jsonObject.put(RedisquesConfiguration.PROP_HTTP_REQUEST_HANDLER_PREFIX, this.httpRequestHandlerPrefix);
        jsonObject.put(RedisquesConfiguration.PROP_HTTP_REQUEST_HANDLER_PORT, Integer.valueOf(this.httpRequestHandlerPort));
        jsonObject.put(RedisquesConfiguration.PROP_HTTP_REQUEST_HANDLER_USER_HEADER, this.httpRequestHandlerUserHeader);
        message.reply(createOkReply().put(RedisquesAPI.VALUE, jsonObject));
    }

    private void setConfiguration(Message<JsonObject> message) {
        JsonObject jsonObject = ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD);
        setConfigurationValues(jsonObject, true).onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                message.reply(createErrorReply().put(RedisquesAPI.MESSAGE, asyncResult.cause().getMessage()));
                return;
            }
            this.log.debug("About to publish the configuration updates to event bus address '" + this.configurationUpdatedAddress + "'");
            this.vertx.eventBus().publish(this.configurationUpdatedAddress, jsonObject);
            message.reply(asyncResult.result());
        });
    }

    private Future<JsonObject> setConfigurationValues(JsonObject jsonObject, boolean z) {
        Promise promise = Promise.promise();
        if (jsonObject != null) {
            List<String> findNotAllowedConfigurationValues = findNotAllowedConfigurationValues(jsonObject.fieldNames());
            if (findNotAllowedConfigurationValues.isEmpty()) {
                try {
                    Long l = jsonObject.getLong("processorDelayMax");
                    if (l == null) {
                        promise.fail("Value for configuration property 'processorDelayMax' is missing");
                        return promise.future();
                    }
                    if (!z) {
                        this.processorDelayMax = l.longValue();
                        this.log.info("Updated configuration value of property 'processorDelayMax' to " + l);
                    }
                    promise.complete(createOkReply());
                } catch (ClassCastException e) {
                    promise.fail("Value for configuration property 'processorDelayMax' is not a number");
                }
            } else {
                promise.fail("Not supported configuration values received: " + findNotAllowedConfigurationValues.toString());
            }
        } else {
            promise.fail("Configuration values missing");
        }
        return promise.future();
    }

    private List<String> findNotAllowedConfigurationValues(Set<String> set) {
        return set == null ? Collections.emptyList() : (List) set.stream().filter(str -> {
            return !ALLOWED_CONFIGURATION_VALUES.contains(str);
        }).collect(Collectors.toList());
    }

    private void registerQueueCheck(RedisquesConfiguration redisquesConfiguration) {
        this.vertx.setPeriodic(redisquesConfiguration.getCheckIntervalTimerMs(), l -> {
            this.luaScriptManager.handleQueueCheck(this.queueCheckLastexecKey, this.checkInterval, bool -> {
                if (bool.booleanValue()) {
                    this.log.info("periodic queue check is triggered now");
                    checkQueues();
                }
            });
        });
    }

    private long getMaxAgeTimestamp() {
        return System.currentTimeMillis() - 120000;
    }

    private void unsupportedOperation(String str, Message<JsonObject> message) {
        JsonObject jsonObject = new JsonObject();
        String str2 = "QUEUE_ERROR: Unsupported operation received: " + str;
        this.log.error(str2);
        jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.ERROR);
        jsonObject.put(RedisquesAPI.MESSAGE, str2);
        message.reply(jsonObject);
    }

    private JsonObject extractLockInfo(String str) {
        if (str == null) {
            return null;
        }
        JsonObject jsonObject = new JsonObject();
        jsonObject.put(RedisquesAPI.REQUESTED_BY, str);
        jsonObject.put("timestamp", Long.valueOf(System.currentTimeMillis()));
        return jsonObject;
    }

    public void stop() {
        unregisterConsumers(true);
    }

    private void gracefulStop(Handler<Void> handler) {
        this.consumersMessageConsumer.unregister(asyncResult -> {
            this.uidMessageConsumer.unregister(asyncResult -> {
                unregisterConsumers(false);
                this.stoppedHandler = handler;
                if (this.myQueues.keySet().isEmpty()) {
                    handler.handle((Object) null);
                }
            });
        });
    }

    private void unregisterConsumers(boolean z) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues unregister consumers force: " + z);
        }
        this.log.debug("RedisQues Unregistering consumers");
        for (Map.Entry<String, QueueState> entry : this.myQueues.entrySet()) {
            String key = entry.getKey();
            if (z || entry.getValue() == QueueState.READY) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues unregister consumers queue: " + key);
                }
                refreshRegistration(key, asyncResult -> {
                    String str = this.consumersPrefix + key;
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues unregister consumers get: " + str);
                    }
                    this.redisAPI.get(str, asyncResult -> {
                        if (asyncResult.failed()) {
                            this.log.warn("Failed to retrieve consumer '{}'.", str, asyncResult.cause());
                        }
                        String objects = Objects.toString(asyncResult.result(), "");
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("RedisQues unregister consumers get result: " + objects);
                        }
                        if (this.uid.equals(objects)) {
                            this.log.debug("RedisQues remove consumer: " + this.uid);
                            this.myQueues.remove(key);
                        }
                    });
                });
            }
        }
    }

    private void resetConsumers() {
        this.log.debug("RedisQues Resetting consumers");
        String str = this.consumersPrefix + "*";
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues reset consumers keys: " + str);
        }
        this.redisAPI.keys(str, asyncResult -> {
            if (asyncResult.failed() || asyncResult.result() == null) {
                this.log.error("Unable to get redis keys of consumers", asyncResult.cause());
                return;
            }
            Response response = (Response) asyncResult.result();
            if (response == null || response.size() == 0) {
                this.log.debug("No consumers found to reset");
                return;
            }
            ArrayList arrayList = new ArrayList(response.size());
            Iterator it = response.iterator();
            while (it.hasNext()) {
                arrayList.add(((Response) it.next()).toString());
            }
            this.redisAPI.del(arrayList, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    this.log.error("Unable to delete redis keys of consumers");
                } else {
                    this.log.debug("Successfully reset " + ((Response) asyncResult.result()).toLong() + " consumers");
                }
            });
        });
    }

    private void consume(String str) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("RedisQues Requested to consume queue " + str);
        }
        refreshRegistration(str, asyncResult -> {
            if (asyncResult.failed()) {
                this.log.warn("Failed to refresh registration for queue '{}'.", str, asyncResult.cause());
            }
            String str2 = this.consumersPrefix + str;
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues consume get: " + str2);
            }
            this.redisAPI.get(str2, asyncResult -> {
                if (asyncResult.failed()) {
                    this.log.error("Unable to get consumer for queue " + str, asyncResult.cause());
                    return;
                }
                String objects = Objects.toString(asyncResult.result(), "");
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues refresh registration consumer: " + objects);
                }
                if (!this.uid.equals(objects)) {
                    this.log.warn("Registration for queue " + str + " has changed to " + objects);
                    this.myQueues.remove(str);
                    notifyConsumer(str);
                    return;
                }
                QueueState queueState = this.myQueues.get(str);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues consumer: " + objects + " queue: " + str + " state: " + queueState);
                }
                if (queueState == QueueState.CONSUMING) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("RedisQues Queue " + str + " is already being consumed");
                    }
                } else {
                    this.myQueues.put(str, QueueState.CONSUMING);
                    if (queueState == null) {
                        this.log.warn("Received request to consume from a queue I did not know about: " + str);
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("RedisQues Starting to consume queue " + str);
                    }
                    readQueue(str);
                }
            });
        });
    }

    private Future<Boolean> isQueueLocked(String str) {
        Promise promise = Promise.promise();
        this.redisAPI.hexists(this.locksKey, str, asyncResult -> {
            if (asyncResult.failed()) {
                this.log.warn("Failed to check if queue '{}' is locked. Assume no.", str, asyncResult.cause());
                promise.complete(Boolean.FALSE);
            } else if (asyncResult.result() == null) {
                promise.complete(Boolean.FALSE);
            } else {
                promise.complete(Boolean.valueOf(((Response) asyncResult.result()).toInteger().intValue() == 1));
            }
        });
        return promise.future();
    }

    private void readQueue(String str) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues read queue: " + str);
        }
        String str2 = this.queuesPrefix + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues read queue lindex: " + str2);
        }
        isQueueLocked(str).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("Failed to check if queue '{}' is locked", str, asyncResult.cause());
            }
            if (!((Boolean) asyncResult.result()).booleanValue()) {
                this.redisAPI.lindex(str2, "0", asyncResult -> {
                    if (asyncResult.failed()) {
                        this.log.error("Failed to peek queue '{}'", str, asyncResult.cause());
                    }
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues read queue lindex result: " + asyncResult.result());
                    }
                    if (asyncResult.result() != null) {
                        processMessageWithTimeout(str, ((Response) asyncResult.result()).toString(), bool -> {
                            int updateQueueFailureCountAndGetRetryInterval = updateQueueFailureCountAndGetRetryInterval(str, bool.booleanValue());
                            if (bool.booleanValue()) {
                                if (this.log.isTraceEnabled()) {
                                    this.log.trace("RedisQues read queue lpop: " + str2);
                                }
                                this.redisAPI.lpop(Collections.singletonList(str2), asyncResult -> {
                                    if (asyncResult.failed()) {
                                        this.log.error("Failed to pop from queue '{}'", str, asyncResult.cause());
                                    }
                                    this.log.debug("RedisQues Message removed, queue " + str + " is ready again");
                                    this.myQueues.put(str, QueueState.READY);
                                    if (this.stoppedHandler != null) {
                                        unregisterConsumers(false);
                                        if (this.myQueues.isEmpty()) {
                                            this.stoppedHandler.handle((Object) null);
                                        }
                                    }
                                    if (this.log.isTraceEnabled()) {
                                        this.log.trace("RedisQues read queue: " + str2);
                                    }
                                    this.redisAPI.llen(str2, asyncResult -> {
                                        if (!asyncResult.succeeded() || asyncResult.result() == null || ((Response) asyncResult.result()).toInteger().intValue() <= 0) {
                                            return;
                                        }
                                        notifyConsumer(str);
                                    });
                                });
                            } else {
                                if (this.log.isDebugEnabled()) {
                                    this.log.debug("RedisQues Processing failed for queue " + str);
                                    this.log.debug("RedisQues will re-send the message to queue '" + str + "' in " + updateQueueFailureCountAndGetRetryInterval + " seconds");
                                }
                                rescheduleSendMessageAfterFailure(str, updateQueueFailureCountAndGetRetryInterval);
                            }
                        });
                        return;
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Got a request to consume from empty queue " + str);
                    }
                    this.myQueues.put(str, QueueState.READY);
                });
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Got a request to consume from locked queue " + str);
            }
            this.myQueues.put(str, QueueState.READY);
        });
    }

    private void rescheduleSendMessageAfterFailure(String str, int i) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedsQues reschedule after failure for queue: " + str);
        }
        this.vertx.setTimer(i * 1000, l -> {
            if (this.log.isDebugEnabled()) {
                this.log.debug("RedisQues re-notify the consumer of queue '" + str + "' at " + new Date(System.currentTimeMillis()));
            }
            notifyConsumer(str);
            this.myQueues.put(str, QueueState.READY);
        });
    }

    private void processMessageWithTimeout(String str, String str2, Handler<Boolean> handler) {
        if (this.processorDelayMax > 0) {
            this.log.info("About to process message for queue " + str + " with a maximum delay of " + this.processorDelayMax + "ms");
        }
        this.timer.executeDelayedMax(this.processorDelayMax).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("Delayed execution has failed.", asyncResult.cause());
                return;
            }
            EventBus eventBus = this.vertx.eventBus();
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("queue", str);
            jsonObject.put(RedisquesAPI.PAYLOAD, str2);
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues process message: " + jsonObject + " for queue: " + str + " send it to processor: " + this.processorAddress);
            }
            eventBus.request(this.processorAddress, jsonObject, new DeliveryOptions().setSendTimeout(this.processorTimeout), asyncResult -> {
                boolean booleanValue;
                if (asyncResult.succeeded()) {
                    booleanValue = RedisquesAPI.OK.equals(((JsonObject) ((Message) asyncResult.result()).body()).getString(RedisquesAPI.STATUS));
                } else {
                    this.log.info("RedisQues QUEUE_ERROR: Consumer failed " + this.uid + " queue: " + str + " (" + asyncResult.cause().getMessage() + ")");
                    booleanValue = Boolean.FALSE.booleanValue();
                }
                handler.handle(Boolean.valueOf(booleanValue));
            });
            updateTimestamp(str, null);
        });
    }

    private void notifyConsumer(String str) {
        this.log.debug("RedisQues Notifying consumer of queue " + str);
        EventBus eventBus = this.vertx.eventBus();
        String str2 = this.consumersPrefix + str;
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues notify consumer get: " + str2);
        }
        this.redisAPI.get(str2, asyncResult -> {
            if (asyncResult.failed()) {
                this.log.warn("Failed to get consumer for queue '{}'", str, asyncResult.cause());
            }
            String objects = Objects.toString(asyncResult.result(), null);
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues got consumer: " + objects);
            }
            if (objects != null) {
                this.log.debug("RedisQues Notifying consumer " + objects + " to consume queue " + str);
                eventBus.send(objects, str);
            } else {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("RedisQues Sending registration request for queue " + str);
                }
                eventBus.send(this.address + "-consumers", str);
            }
        });
    }

    private void refreshRegistration(String str, Handler<AsyncResult<Response>> handler) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("RedisQues Refreshing registration of queue " + str + ", expire in " + this.consumerLockTime + " s");
        }
        String str2 = this.consumersPrefix + str;
        if (handler == null) {
            this.redisAPI.expire(List.of(str2, String.valueOf(this.consumerLockTime)));
        } else {
            this.redisAPI.expire(List.of(str2, String.valueOf(this.consumerLockTime)), handler);
        }
    }

    private void updateTimestamp(String str, Handler<AsyncResult<Response>> handler) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.log.isTraceEnabled()) {
            this.log.trace("RedisQues update timestamp for queue: " + str + " to: " + currentTimeMillis);
        }
        if (handler == null) {
            this.redisAPI.zadd(Arrays.asList(this.queuesKey, String.valueOf(currentTimeMillis), str));
        } else {
            this.redisAPI.zadd(Arrays.asList(this.queuesKey, String.valueOf(currentTimeMillis), str), handler);
        }
    }

    private void checkQueues() {
        this.log.debug("Checking queues timestamps");
        long currentTimeMillis = System.currentTimeMillis() - ((3 * this.refreshPeriod) * 1000);
        this.redisAPI.zrangebyscore(Arrays.asList(this.queuesKey, "-inf", String.valueOf(currentTimeMillis)), asyncResult -> {
            Response response = (Response) asyncResult.result();
            if (asyncResult.failed() || response == null) {
                this.log.error("RedisQues is unable to get list of queues", asyncResult.cause());
                return;
            }
            AtomicInteger atomicInteger = new AtomicInteger(response.size());
            if (this.log.isTraceEnabled()) {
                this.log.trace("RedisQues update queues: " + atomicInteger);
            }
            Iterator it = response.iterator();
            while (it.hasNext()) {
                String response2 = ((Response) it.next()).toString();
                String str = this.queuesPrefix + response2;
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues update queue: " + str);
                }
                this.redisAPI.exists(Collections.singletonList(str), asyncResult -> {
                    if (asyncResult.failed() || asyncResult.result() == null) {
                        this.log.error("RedisQues is unable to check existence of queue " + response2, asyncResult.cause());
                        return;
                    }
                    if (((Response) asyncResult.result()).toLong().longValue() == 1) {
                        this.log.debug("Updating queue timestamp for queue '{}'", response2);
                        updateTimestamp(response2, asyncResult -> {
                            if (asyncResult.failed()) {
                                this.log.warn("Failed to update timestamps for queue '{}'", response2, asyncResult.cause());
                            }
                            if (atomicInteger.decrementAndGet() == 0) {
                                removeOldQueues(currentTimeMillis);
                            }
                        });
                        refreshRegistration(response2, null);
                        notifyConsumer(response2);
                        return;
                    }
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("RedisQues remove old queue: " + response2);
                    }
                    if (atomicInteger.decrementAndGet() == 0) {
                        removeOldQueues(currentTimeMillis);
                    }
                    this.queueStatisticsCollector.resetQueueFailureStatistics(response2);
                });
            }
        });
    }

    private static JsonObject createOkReply() {
        return new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK);
    }

    private static JsonObject createErrorReply() {
        return new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.ERROR);
    }

    private void removeOldQueues(long j) {
        this.log.debug("Cleaning old queues");
        this.redisAPI.zremrangebyscore(this.queuesKey, "-inf", String.valueOf(j), asyncResult -> {
        });
    }

    private int getMaxQueueItemCountIndex(String str) {
        int i = DEFAULT_MAX_QUEUEITEM_COUNT;
        if (str != null) {
            try {
                int parseInt = Integer.parseInt(str) - 1;
                if (parseInt >= 0) {
                    i = parseInt;
                }
                this.log.info("use limit parameter " + parseInt);
            } catch (NumberFormatException e) {
                this.log.warn("Invalid limit parameter '" + str + "' configured for max queue item count. Using default 49");
            }
        }
        return i;
    }

    private QueueConfiguration findQueueConfiguration(String str) {
        for (QueueConfiguration queueConfiguration : this.queueConfigurations) {
            if (queueConfiguration.compiledPattern().matcher(str).matches()) {
                return queueConfiguration;
            }
        }
        return null;
    }

    private void getQueueItemsCount(Message<JsonObject> message) {
        this.redisAPI.llen(this.queuesPrefix + ((JsonObject) message.body()).getJsonObject(RedisquesAPI.PAYLOAD).getString(RedisquesAPI.QUEUENAME), new GetQueueItemsCountHandler(message));
    }

    private void getQueuesItemsCount(Message<JsonObject> message) {
        getQueuesItemsCount(message, MessageUtil.extractFilterPattern(message));
    }

    private void getQueuesItemsCount(Message<JsonObject> message, Result<Optional<Pattern>, String> result) {
        if (result.isErr()) {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, result.getErr()));
        } else {
            this.redisAPI.zrangebyscore(List.of(this.queuesKey, String.valueOf(getMaxAgeTimestamp()), "+inf"), new GetQueuesItemsCountHandler(message, result.getOk(), this.luaScriptManager, this.queuesPrefix));
        }
    }

    private void getQueuesStatistics(Message<JsonObject> message) {
        getQueuesStatistics(message, MessageUtil.extractFilterPattern(message));
    }

    private void getQueuesStatistics(Message<JsonObject> message, Result<Optional<Pattern>, String> result) {
        if (result.isErr()) {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, result.getErr()));
        } else {
            this.redisAPI.zrangebyscore(List.of(this.queuesKey, String.valueOf(getMaxAgeTimestamp()), "+inf"), new GetQueuesStatisticsHandler(message, result.getOk(), this.queueStatisticsCollector));
        }
    }

    private void getQueuesSpeed(Message<JsonObject> message) {
        getQueuesSpeed(message, MessageUtil.extractFilterPattern(message));
    }

    private void getQueuesSpeed(Message<JsonObject> message, Result<Optional<Pattern>, String> result) {
        if (result.isErr()) {
            message.reply(createErrorReply().put(RedisquesAPI.ERROR_TYPE, RedisquesAPI.BAD_INPUT).put(RedisquesAPI.MESSAGE, result.getErr()));
        } else {
            this.redisAPI.zrangebyscore(List.of(this.queuesKey, String.valueOf(getMaxAgeTimestamp()), "+inf"), new GetQueuesSpeedHandler(message, result.getOk(), this.queueStatisticsCollector));
        }
    }
}
