package org.swisspush.redisques;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.action.QueueAction;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.util.DefaultMemoryUsageProvider;
import org.swisspush.redisques.util.DefaultRedisProvider;
import org.swisspush.redisques.util.DefaultRedisquesConfigurationProvider;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.MemoryUsageProvider;
import org.swisspush.redisques.util.QueueActionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
import org.swisspush.redisques.util.RedisQuesTimer;
import org.swisspush.redisques.util.RedisUtils;
import org.swisspush.redisques.util.RedisquesAPI;
import org.swisspush.redisques.util.RedisquesConfiguration;
import org.swisspush.redisques.util.RedisquesConfigurationProvider;

/* loaded from: input_file:org/swisspush/redisques/RedisQues.class */
public class RedisQues extends AbstractVerticle {
    private MessageConsumer<String> uidMessageConsumer;
    private QueueStatisticsCollector queueStatisticsCollector;
    private MessageConsumer<String> consumersMessageConsumer;
    private RedisProvider redisProvider;
    private String queuesKey;
    private String queuesPrefix;
    private String consumersPrefix;
    private String locksKey;
    private String queueCheckLastexecKey;
    private int consumerLockTime;
    private RedisQuesTimer timer;
    private MemoryUsageProvider memoryUsageProvider;
    private QueueActionFactory queueActionFactory;
    private RedisquesConfigurationProvider configurationProvider;
    private final String uid = UUID.randomUUID().toString();
    private final Map<String, QueueState> myQueues = new HashMap();
    private final Logger log = LoggerFactory.getLogger(RedisQues.class);
    private Handler<Void> stoppedHandler = null;
    private Map<RedisquesAPI.QueueOperation, QueueAction> queueActions = new HashMap();
    private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap();

    /* loaded from: input_file:org/swisspush/redisques/RedisQues$FailedAsyncResult.class */
    private class FailedAsyncResult<Response> implements AsyncResult<Response> {
        private final Throwable cause;

        private FailedAsyncResult(Throwable th) {
            this.cause = th;
        }

        public Response result() {
            return null;
        }

        public Throwable cause() {
            return this.cause;
        }

        public boolean succeeded() {
            return false;
        }

        public boolean failed() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/swisspush/redisques/RedisQues$QueueState.class */
    public enum QueueState {
        READY,
        CONSUMING
    }

    /* loaded from: input_file:org/swisspush/redisques/RedisQues$RedisQuesBuilder.class */
    public static class RedisQuesBuilder {
        private MemoryUsageProvider memoryUsageProvider;
        private RedisquesConfigurationProvider configurationProvider;
        private RedisProvider redisProvider;

        private RedisQuesBuilder() {
        }

        public RedisQuesBuilder withMemoryUsageProvider(MemoryUsageProvider memoryUsageProvider) {
            this.memoryUsageProvider = memoryUsageProvider;
            return this;
        }

        public RedisQuesBuilder withRedisquesRedisquesConfigurationProvider(RedisquesConfigurationProvider redisquesConfigurationProvider) {
            this.configurationProvider = redisquesConfigurationProvider;
            return this;
        }

        public RedisQuesBuilder withRedisProvider(RedisProvider redisProvider) {
            this.redisProvider = redisProvider;
            return this;
        }

        public RedisQues build() {
            return new RedisQues(this.memoryUsageProvider, this.configurationProvider, this.redisProvider);
        }
    }

    public RedisQues() {
    }

    public RedisQues(MemoryUsageProvider memoryUsageProvider, RedisquesConfigurationProvider redisquesConfigurationProvider, RedisProvider redisProvider) {
        this.memoryUsageProvider = memoryUsageProvider;
        this.configurationProvider = redisquesConfigurationProvider;
        this.redisProvider = redisProvider;
    }

    public static RedisQuesBuilder builder() {
        return new RedisQuesBuilder();
    }

    private void redisSetWithOptions(String str, String str2, boolean z, int i, Handler<AsyncResult<Response>> handler) {
        JsonArray jsonArray = new JsonArray();
        jsonArray.add("EX").add(Integer.valueOf(i));
        if (z) {
            jsonArray.add("NX");
        }
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.send(Command.SET, (String[]) RedisUtils.toPayload(str, str2, jsonArray).toArray(new String[0])).onComplete(handler);
        }).onFailure(th -> {
            handler.handle(new FailedAsyncResult(th));
        });
    }

    private void handleRegistrationRequest(Message<String> message) {
        String str = (String) message.body();
        if (str == null) {
            this.log.warn("Got message without queue name while handleRegistrationRequest.");
        }
        this.log.debug("RedisQues Got registration request for queue {} from consumer: {}", str, this.uid);
        redisSetWithOptions(this.consumersPrefix + str, this.uid, true, this.consumerLockTime, asyncResult -> {
            if (!asyncResult.succeeded()) {
                this.log.error("redisSetWithOptions failed", asyncResult.cause());
                return;
            }
            String response = asyncResult.result() != null ? ((Response) asyncResult.result()).toString() : null;
            this.log.trace("RedisQues setxn result: {} for queue: {}", response, str);
            if (!"OK".equals(response)) {
                this.log.debug("RedisQues Missed registration for queue {}", str);
                return;
            }
            this.log.debug("RedisQues Now registered for queue {}", str);
            this.myQueues.put(str, QueueState.READY);
            consume(str);
        });
    }

    public void start(Promise<Void> promise) {
        this.log.info("Started with UID {}", this.uid);
        if (this.configurationProvider == null) {
            this.configurationProvider = new DefaultRedisquesConfigurationProvider(this.vertx, config());
        }
        RedisquesConfiguration configuration = this.configurationProvider.configuration();
        this.log.info("Starting Redisques module with configuration: {}", this.configurationProvider.configuration());
        int dequeueStatisticReportIntervalSec = configuration.getDequeueStatisticReportIntervalSec();
        if (dequeueStatisticReportIntervalSec > 0) {
            this.vertx.setPeriodic(1000 * dequeueStatisticReportIntervalSec, l -> {
                this.dequeueStatistic.forEach((str, dequeueStatistic) -> {
                    this.queueStatisticsCollector.setDequeueStatistic(str, dequeueStatistic);
                });
            });
        }
        this.queuesKey = configuration.getRedisPrefix() + "queues";
        this.queuesPrefix = configuration.getRedisPrefix() + "queues:";
        this.consumersPrefix = configuration.getRedisPrefix() + "consumers:";
        this.locksKey = configuration.getRedisPrefix() + "locks";
        this.queueCheckLastexecKey = configuration.getRedisPrefix() + "check:lastexec";
        this.consumerLockTime = 2 * configuration.getRefreshPeriod();
        this.timer = new RedisQuesTimer(this.vertx);
        if (this.redisProvider == null) {
            this.redisProvider = new DefaultRedisProvider(this.vertx, this.configurationProvider);
        }
        this.redisProvider.redis().onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                promise.fail(new Exception(asyncResult.cause()));
            } else {
                initialize();
                promise.complete();
            }
        });
    }

    private void initialize() {
        RedisquesConfiguration configuration = this.configurationProvider.configuration();
        this.queueStatisticsCollector = new QueueStatisticsCollector(this.redisProvider, this.queuesPrefix, this.vertx, configuration.getQueueSpeedIntervalSec());
        RedisquesHttpRequestHandler.init(this.vertx, configuration, this.queueStatisticsCollector);
        if (this.memoryUsageProvider == null) {
            this.memoryUsageProvider = new DefaultMemoryUsageProvider(this.redisProvider, this.vertx, this.configurationProvider.configuration().getMemoryUsageCheckIntervalSec());
        }
        this.queueActionFactory = new QueueActionFactory(this.redisProvider, this.vertx, this.log, this.queuesKey, this.queuesPrefix, this.consumersPrefix, this.locksKey, this.queueStatisticsCollector, this.memoryUsageProvider, this.configurationProvider);
        this.queueActions.put(RedisquesAPI.QueueOperation.addQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.addQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteAllQueueItems, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteAllQueueItems));
        this.queueActions.put(RedisquesAPI.QueueOperation.bulkDeleteQueues, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.bulkDeleteQueues));
        this.queueActions.put(RedisquesAPI.QueueOperation.replaceQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.replaceQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueueItem, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueueItem));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueueItems, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueueItems));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueues, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueues));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesCount, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesCount));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueueItemsCount, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueueItemsCount));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesItemsCount, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesItemsCount));
        this.queueActions.put(RedisquesAPI.QueueOperation.enqueue, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.enqueue));
        this.queueActions.put(RedisquesAPI.QueueOperation.lockedEnqueue, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.lockedEnqueue));
        this.queueActions.put(RedisquesAPI.QueueOperation.getLock, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getLock));
        this.queueActions.put(RedisquesAPI.QueueOperation.putLock, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.putLock));
        this.queueActions.put(RedisquesAPI.QueueOperation.bulkPutLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.bulkPutLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.getAllLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getAllLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteLock, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteLock));
        this.queueActions.put(RedisquesAPI.QueueOperation.bulkDeleteLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.bulkDeleteLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.deleteAllLocks, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.deleteAllLocks));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesSpeed, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesSpeed));
        this.queueActions.put(RedisquesAPI.QueueOperation.getQueuesStatistics, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getQueuesStatistics));
        this.queueActions.put(RedisquesAPI.QueueOperation.setConfiguration, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.setConfiguration));
        this.queueActions.put(RedisquesAPI.QueueOperation.getConfiguration, this.queueActionFactory.buildQueueAction(RedisquesAPI.QueueOperation.getConfiguration));
        String address = configuration.getAddress();
        this.vertx.eventBus().consumer(address, operationsHandler());
        this.consumersMessageConsumer = this.vertx.eventBus().consumer(address + "-consumers", this::handleRegistrationRequest);
        this.uidMessageConsumer = this.vertx.eventBus().consumer(this.uid, message -> {
            String str = (String) message.body();
            if (str == null) {
                this.log.warn("Got event bus msg with empty body! uid={}  address={}  replyAddress={}", new Object[]{this.uid, message.address(), message.replyAddress()});
            }
            this.log.debug("RedisQues got notification for queue '{}'", str);
            consume(str);
        });
        registerActiveQueueRegistrationRefresh();
        registerQueueCheck();
    }

    private void registerActiveQueueRegistrationRefresh() {
        this.vertx.setPeriodic(this.configurationProvider.configuration().getRefreshPeriod() * 1000, l -> {
            this.myQueues.entrySet().stream().filter(entry -> {
                return entry.getValue() == QueueState.CONSUMING;
            }).forEach(entry2 -> {
                String str = (String) entry2.getKey();
                String str2 = this.consumersPrefix + str;
                if (this.log.isTraceEnabled()) {
                    this.log.trace("RedisQues refresh queues get: {}", str2);
                }
                this.redisProvider.redis().onSuccess(redisAPI -> {
                    redisAPI.get(str2, asyncResult -> {
                        if (asyncResult.failed()) {
                            this.log.warn("Failed to get queue consumer for queue '{}'. But we'll continue anyway :)", str, asyncResult.cause());
                        }
                        if (this.uid.equals(Objects.toString(asyncResult.result(), ""))) {
                            this.log.debug("RedisQues Periodic consumer refresh for active queue {}", str);
                            refreshRegistration(str, asyncResult -> {
                                if (asyncResult.failed()) {
                                    this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                                }
                                updateTimestamp(str, null);
                            });
                        } else {
                            this.log.debug("RedisQues Removing queue {} from the list", str);
                            this.myQueues.remove(str);
                            this.queueStatisticsCollector.resetQueueFailureStatistics(str);
                        }
                    });
                }).onFailure(th -> {
                    this.log.error("Redis: Failed to registerActiveQueueRegistrationRefresh", th);
                });
            });
        });
    }

    private Handler<Message<JsonObject>> operationsHandler() {
        return message -> {
            JsonObject jsonObject = (JsonObject) message.body();
            if (jsonObject == null) {
                throw new NullPointerException("Why is body empty? addr=" + message.address() + "  replyAddr=" + message.replyAddress());
            }
            String string = jsonObject.getString(RedisquesAPI.OPERATION);
            this.log.trace("RedisQues got operation: {}", string);
            RedisquesAPI.QueueOperation fromString = RedisquesAPI.QueueOperation.fromString(string);
            if (fromString == null) {
                unsupportedOperation(string, message);
                return;
            }
            switch (fromString) {
                case check:
                    checkQueues();
                    return;
                case reset:
                    resetConsumers();
                    return;
                case stop:
                    gracefulStop(r1 -> {
                    });
                    return;
                default:
                    this.queueActions.getOrDefault(fromString, this.queueActionFactory.buildUnsupportedAction()).execute(message);
                    return;
            }
        };
    }

    int updateQueueFailureCountAndGetRetryInterval(String str, boolean z) {
        int[] retryIntervals;
        if (z) {
            this.queueStatisticsCollector.queueMessageSuccess(str);
            return 0;
        }
        long queueMessageFailed = this.queueStatisticsCollector.queueMessageFailed(str);
        QueueConfiguration findQueueConfiguration = findQueueConfiguration(str);
        if (findQueueConfiguration == null || (retryIntervals = findQueueConfiguration.getRetryIntervals()) == null || retryIntervals.length <= 0) {
            return this.configurationProvider.configuration().getRefreshPeriod();
        }
        int i = retryIntervals[(int) (queueMessageFailed <= ((long) retryIntervals.length) ? queueMessageFailed - 1 : retryIntervals.length - 1)];
        this.queueStatisticsCollector.setQueueSlowDownTime(str, i);
        return i;
    }

    private void registerQueueCheck() {
        this.vertx.setPeriodic(this.configurationProvider.configuration().getCheckIntervalTimerMs(), l -> {
            this.redisProvider.connection().onFailure(th -> {
                this.log.error("TODO error handling", new Exception(th));
            }).onSuccess(redis -> {
                redis.send(Request.cmd(Command.SET, new Object[]{this.queueCheckLastexecKey, Long.valueOf(System.currentTimeMillis()), "NX", "EX", Integer.valueOf(this.configurationProvider.configuration().getCheckInterval())})).onFailure(th2 -> {
                    this.log.error("Unexpected queue check result", new Exception(th2));
                }).onSuccess(response -> {
                    this.log.info("periodic queue check is triggered now");
                    checkQueues();
                });
            });
        });
    }

    private void unsupportedOperation(String str, Message<JsonObject> message) {
        JsonObject jsonObject = new JsonObject();
        String str2 = "QUEUE_ERROR: Unsupported operation received: " + str;
        this.log.error(str2);
        jsonObject.put(RedisquesAPI.STATUS, RedisquesAPI.ERROR);
        jsonObject.put(RedisquesAPI.MESSAGE, str2);
        message.reply(jsonObject);
    }

    public void stop() {
        unregisterConsumers(true);
    }

    private void gracefulStop(Handler<Void> handler) {
        this.consumersMessageConsumer.unregister(asyncResult -> {
            this.uidMessageConsumer.unregister(asyncResult -> {
                if (asyncResult.failed()) {
                    this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                }
                unregisterConsumers(false).onComplete(asyncResult -> {
                    if (asyncResult.failed()) {
                        this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                    }
                    this.stoppedHandler = handler;
                    if (this.myQueues.keySet().isEmpty()) {
                        handler.handle((Object) null);
                    }
                });
            });
        });
    }

    private Future<Void> unregisterConsumers(boolean z) {
        Promise promise = Promise.promise();
        this.log.debug("RedisQues unregister consumers. force={}", Boolean.valueOf(z));
        ArrayList arrayList = new ArrayList(this.myQueues.size());
        for (Map.Entry<String, QueueState> entry : this.myQueues.entrySet()) {
            Promise promise2 = Promise.promise();
            arrayList.add(promise2.future());
            String key = entry.getKey();
            if (z || entry.getValue() == QueueState.READY) {
                this.log.trace("RedisQues unregister consumers queue: {}", key);
                refreshRegistration(key, asyncResult -> {
                    if (asyncResult.failed()) {
                        this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                    }
                    String str = this.consumersPrefix + key;
                    this.log.trace("RedisQues unregister consumers get: {}", str);
                    this.redisProvider.redis().onSuccess(redisAPI -> {
                        redisAPI.get(str, asyncResult -> {
                            if (asyncResult.failed()) {
                                this.log.warn("Failed to retrieve consumer '{}'.", str, asyncResult.cause());
                            }
                            String objects = Objects.toString(asyncResult.result(), "");
                            this.log.trace("RedisQues unregister consumers get result: {}", objects);
                            if (this.uid.equals(objects)) {
                                this.log.debug("RedisQues remove consumer: {}", this.uid);
                                this.myQueues.remove(key);
                            }
                            promise2.complete();
                        });
                    }).onFailure(th -> {
                        this.log.warn("Failed to retrieve consumer '{}'.", str, th);
                        promise2.complete();
                    });
                });
            } else {
                promise2.complete();
            }
        }
        CompositeFuture.all(arrayList).onComplete(asyncResult2 -> {
            if (asyncResult2.failed()) {
                this.log.warn("TODO error handling", new Exception(asyncResult2.cause()));
            }
            promise.complete();
        });
        return promise.future();
    }

    private void resetConsumers() {
        this.log.debug("RedisQues Resetting consumers");
        String str = this.consumersPrefix + "*";
        this.log.trace("RedisQues reset consumers keys: {}", str);
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.keys(str, asyncResult -> {
                if (asyncResult.failed() || asyncResult.result() == null) {
                    this.log.error("Unable to get redis keys of consumers", asyncResult.cause());
                    return;
                }
                Response response = (Response) asyncResult.result();
                if (response == null || response.size() == 0) {
                    this.log.debug("No consumers found to reset");
                    return;
                }
                ArrayList arrayList = new ArrayList(response.size());
                Iterator it = response.iterator();
                while (it.hasNext()) {
                    arrayList.add(((Response) it.next()).toString());
                }
                redisAPI.del(arrayList, asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        this.log.error("Unable to delete redis keys of consumers");
                    } else if (this.log.isDebugEnabled()) {
                        this.log.debug("Successfully reset {} consumers", ((Response) asyncResult.result()).toLong());
                    }
                });
            });
        }).onFailure(th -> {
            this.log.error("Redis: Unable to get redis keys of consumers", th);
        });
    }

    private Future<Void> consume(String str) {
        Promise promise = Promise.promise();
        this.log.debug("RedisQues Requested to consume queue {}", str);
        refreshRegistration(str, asyncResult -> {
            if (asyncResult.failed()) {
                this.log.warn("Failed to refresh registration for queue '{}'.", str, asyncResult.cause());
            }
            String str2 = this.consumersPrefix + str;
            this.log.trace("RedisQues consume get: {}", str2);
            this.redisProvider.redis().onSuccess(redisAPI -> {
                redisAPI.get(str2, asyncResult -> {
                    if (asyncResult.failed()) {
                        this.log.error("Unable to get consumer for queue " + str, asyncResult.cause());
                        return;
                    }
                    String objects = Objects.toString(asyncResult.result(), "");
                    this.log.trace("RedisQues refresh registration consumer: {}", objects);
                    if (!this.uid.equals(objects)) {
                        this.log.warn("Registration for queue {} has changed to {}", str, objects);
                        this.myQueues.remove(str);
                        notifyConsumer(str).onComplete(asyncResult -> {
                            if (asyncResult.failed()) {
                                this.log.warn("TODO error handling", asyncResult.cause());
                            }
                            promise.complete();
                        });
                        return;
                    }
                    QueueState queueState = this.myQueues.get(str);
                    this.log.trace("RedisQues consumer: {} queue: {} state: {}", new Object[]{objects, str, queueState});
                    if (queueState == QueueState.CONSUMING) {
                        this.log.debug("RedisQues Queue {} is already being consumed", str);
                        promise.complete();
                        return;
                    }
                    this.myQueues.put(str, QueueState.CONSUMING);
                    if (queueState == null) {
                        this.log.warn("Received request to consume from a queue I did not know about: {}", str);
                    }
                    this.log.debug("RedisQues Starting to consume queue {}", str);
                    readQueue(str).onComplete(asyncResult2 -> {
                        if (asyncResult2.failed()) {
                            this.log.warn("TODO error handling", new Exception(asyncResult2.cause()));
                        }
                        promise.complete();
                    });
                });
            }).onFailure(th -> {
                this.log.error("Redis: Unable to get consumer for queue " + str, th);
            });
        });
        return promise.future();
    }

    private Future<Boolean> isQueueLocked(String str) {
        Promise promise = Promise.promise();
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.hexists(this.locksKey, str, asyncResult -> {
                if (asyncResult.failed()) {
                    this.log.warn("Failed to check if queue '{}' is locked. Assume no.", str, asyncResult.cause());
                    promise.complete(Boolean.FALSE);
                } else if (asyncResult.result() == null) {
                    promise.complete(Boolean.FALSE);
                } else {
                    promise.complete(Boolean.valueOf(((Response) asyncResult.result()).toInteger().intValue() == 1));
                }
            });
        }).onFailure(th -> {
            this.log.warn("Redis: Failed to check if queue '{}' is locked. Assume no.", str, th);
            promise.complete(Boolean.FALSE);
        });
        return promise.future();
    }

    private Future<Void> readQueue(String str) {
        Promise promise = Promise.promise();
        this.log.trace("RedisQues read queue: {}", str);
        String str2 = this.queuesPrefix + str;
        this.log.trace("RedisQues read queue lindex: {}", str2);
        isQueueLocked(str).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                throw new UnsupportedOperationException("TODO error handling " + str, asyncResult.cause());
            }
            if (!((Boolean) asyncResult.result()).booleanValue()) {
                this.redisProvider.redis().onSuccess(redisAPI -> {
                    redisAPI.lindex(str2, "0", asyncResult -> {
                        if (asyncResult.failed()) {
                            this.log.error("Failed to peek queue '{}'", str, asyncResult.cause());
                        }
                        Response response = (Response) asyncResult.result();
                        this.log.trace("RedisQues read queue lindex result: {}", response);
                        if (response != null) {
                            this.dequeueStatistic.computeIfAbsent(str, str3 -> {
                                return new DequeueStatistic();
                            });
                            this.dequeueStatistic.get(str).lastDequeueAttemptTimestamp = Long.valueOf(System.currentTimeMillis());
                            processMessageWithTimeout(str, response.toString(), bool -> {
                                int updateQueueFailureCountAndGetRetryInterval = updateQueueFailureCountAndGetRetryInterval(str, bool.booleanValue());
                                if (bool.booleanValue()) {
                                    this.log.trace("RedisQues read queue lpop: {}", str2);
                                    redisAPI.lpop(Collections.singletonList(str2), asyncResult -> {
                                        if (asyncResult.failed()) {
                                            this.log.error("Failed to pop from queue '{}'", str, asyncResult.cause());
                                        }
                                        this.log.debug("RedisQues Message removed, queue {} is ready again", str);
                                        this.myQueues.put(str, QueueState.READY);
                                        Handler handler = r11 -> {
                                            this.log.trace("RedisQues read queue: {}", str2);
                                            redisAPI.llen(str2, asyncResult -> {
                                                if (asyncResult.succeeded() && asyncResult.result() != null && ((Response) asyncResult.result()).toInteger().intValue() > 0) {
                                                    notifyConsumer(str).onComplete(asyncResult -> {
                                                        if (asyncResult.failed()) {
                                                            this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                                                        }
                                                        promise.complete();
                                                    });
                                                    return;
                                                }
                                                if (asyncResult.failed()) {
                                                    this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                                                }
                                                promise.complete();
                                            });
                                        };
                                        if (this.stoppedHandler != null) {
                                            unregisterConsumers(false).onComplete(asyncResult -> {
                                                if (asyncResult.failed()) {
                                                    this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                                                }
                                                if (this.myQueues.isEmpty()) {
                                                    this.stoppedHandler.handle((Object) null);
                                                }
                                                handler.handle((Object) null);
                                            });
                                        } else {
                                            handler.handle((Object) null);
                                        }
                                    });
                                } else {
                                    this.log.debug("RedisQues Processing failed for queue {}", str);
                                    this.log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", str, Integer.valueOf(updateQueueFailureCountAndGetRetryInterval));
                                    rescheduleSendMessageAfterFailure(str, updateQueueFailureCountAndGetRetryInterval);
                                    promise.complete();
                                }
                            });
                            return;
                        }
                        this.log.debug("Got a request to consume from empty queue {}", str);
                        this.myQueues.put(str, QueueState.READY);
                        this.dequeueStatistic.remove(str);
                        promise.complete();
                    });
                }).onFailure(th -> {
                    this.log.warn("Redis: Error on readQueue", th);
                    this.myQueues.put(str, QueueState.READY);
                    promise.complete();
                });
                return;
            }
            this.log.debug("Got a request to consume from locked queue {}", str);
            this.myQueues.put(str, QueueState.READY);
            promise.complete();
        });
        return promise.future();
    }

    private void rescheduleSendMessageAfterFailure(String str, int i) {
        this.log.trace("RedsQues reschedule after failure for queue: {}", str);
        this.vertx.setTimer(i * 1000, l -> {
            this.dequeueStatistic.get(str).nextDequeueDueTimestamp = Long.valueOf(System.currentTimeMillis() + (i * 1000));
            if (this.log.isDebugEnabled()) {
                this.log.debug("RedisQues re-notify the consumer of queue '{}' at {}", str, new Date(System.currentTimeMillis()));
            }
            notifyConsumer(str).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                }
                this.myQueues.put(str, QueueState.READY);
            });
        });
    }

    private void processMessageWithTimeout(String str, String str2, Handler<Boolean> handler) {
        long processorDelayMax = this.configurationProvider.configuration().getProcessorDelayMax();
        if (processorDelayMax > 0) {
            this.log.info("About to process message for queue {} with a maximum delay of {}ms", str, Long.valueOf(processorDelayMax));
        }
        this.timer.executeDelayedMax(processorDelayMax).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("Delayed execution has failed.", new Exception(asyncResult.cause()));
                return;
            }
            String processorAddress = this.configurationProvider.configuration().getProcessorAddress();
            EventBus eventBus = this.vertx.eventBus();
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("queue", str);
            jsonObject.put(RedisquesAPI.PAYLOAD, str2);
            this.log.trace("RedisQues process message: {} for queue: {} send it to processor: {}", new Object[]{jsonObject, str, processorAddress});
            eventBus.request(processorAddress, jsonObject, new DeliveryOptions().setSendTimeout(this.configurationProvider.configuration().getProcessorTimeout()), asyncResult -> {
                boolean booleanValue;
                if (asyncResult.succeeded()) {
                    booleanValue = RedisquesAPI.OK.equals(((JsonObject) ((Message) asyncResult.result()).body()).getString(RedisquesAPI.STATUS));
                    if (booleanValue) {
                        this.dequeueStatistic.get(str).lastDequeueSuccessTimestamp = Long.valueOf(System.currentTimeMillis());
                        this.dequeueStatistic.get(str).nextDequeueDueTimestamp = null;
                    }
                } else {
                    this.log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {}", new Object[]{this.uid, str, new Exception(asyncResult.cause())});
                    booleanValue = Boolean.FALSE.booleanValue();
                }
                handler.handle(Boolean.valueOf(booleanValue));
            });
            updateTimestamp(str, null);
        });
    }

    private Future<Void> notifyConsumer(String str) {
        this.log.debug("RedisQues Notifying consumer of queue {}", str);
        EventBus eventBus = this.vertx.eventBus();
        Promise promise = Promise.promise();
        String str2 = this.consumersPrefix + str;
        this.log.trace("RedisQues notify consumer get: {}", str2);
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.get(str2, asyncResult -> {
                if (asyncResult.failed()) {
                    this.log.warn("Failed to get consumer for queue '{}'", str, new Exception(asyncResult.cause()));
                }
                String objects = Objects.toString(asyncResult.result(), null);
                this.log.trace("RedisQues got consumer: {}", objects);
                if (objects == null) {
                    this.log.debug("RedisQues Sending registration request for queue {}", str);
                    eventBus.send(this.configurationProvider.configuration().getAddress() + "-consumers", str);
                    promise.complete();
                } else {
                    this.log.debug("RedisQues Notifying consumer {} to consume queue {}", objects, str);
                    eventBus.send(objects, str);
                    promise.complete();
                }
            });
        }).onFailure(th -> {
            this.log.warn("Redis: Failed to get consumer for queue '{}'", str, th);
            promise.complete();
        });
        return promise.future();
    }

    private void refreshRegistration(String str, Handler<AsyncResult<Response>> handler) {
        this.log.debug("RedisQues Refreshing registration of queue {}, expire in {} s", str, Integer.valueOf(this.consumerLockTime));
        String str2 = this.consumersPrefix + str;
        if (handler == null) {
            throw new RuntimeException("Handler must be set");
        }
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.expire(List.of(str2, String.valueOf(this.consumerLockTime)), handler);
        }).onFailure(th -> {
            this.log.warn("Redis: Failed to refresh registration of queue {}", str, th);
            handler.handle(new FailedAsyncResult(th));
        });
    }

    private void updateTimestamp(String str, Handler<AsyncResult<Response>> handler) {
        long currentTimeMillis = System.currentTimeMillis();
        this.log.trace("RedisQues update timestamp for queue: {} to: {}", str, Long.valueOf(currentTimeMillis));
        this.redisProvider.redis().onSuccess(redisAPI -> {
            if (handler == null) {
                redisAPI.zadd(Arrays.asList(this.queuesKey, String.valueOf(currentTimeMillis), str));
            } else {
                redisAPI.zadd(Arrays.asList(this.queuesKey, String.valueOf(currentTimeMillis), str), handler);
            }
        }).onFailure(th -> {
            this.log.warn("Redis: Error in updateTimestamp", th);
            if (handler != null) {
                handler.handle(new FailedAsyncResult(th));
            }
        });
    }

    private Future<Void> checkQueues() {
        Promise promise = Promise.promise();
        this.log.debug("Checking queues timestamps");
        long currentTimeMillis = System.currentTimeMillis() - ((3 * this.configurationProvider.configuration().getRefreshPeriod()) * 1000);
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.zrangebyscore(Arrays.asList(this.queuesKey, "-inf", String.valueOf(currentTimeMillis)), asyncResult -> {
                Response<Response> response = (Response) asyncResult.result();
                if (asyncResult.failed() || response == null) {
                    this.log.error("RedisQues is unable to get list of queues", asyncResult.cause());
                    promise.complete();
                    return;
                }
                AtomicInteger atomicInteger = new AtomicInteger(response.size());
                this.log.trace("RedisQues update queues: {}", atomicInteger);
                ArrayList arrayList = new ArrayList(response.size());
                for (Response response2 : response) {
                    Promise promise2 = Promise.promise();
                    arrayList.add(promise2.future());
                    String response3 = response2.toString();
                    String str = this.queuesPrefix + response3;
                    this.log.trace("RedisQues update queue: {}", str);
                    Handler handler = r9 -> {
                        refreshRegistration(response3, asyncResult -> {
                            if (asyncResult.failed()) {
                                this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                            }
                            notifyConsumer(response3).onComplete(asyncResult -> {
                                if (asyncResult.failed()) {
                                    this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                                }
                                promise2.complete();
                            });
                        });
                    };
                    redisAPI.exists(Collections.singletonList(str), asyncResult -> {
                        if (asyncResult.failed() || asyncResult.result() == null) {
                            this.log.error("RedisQues is unable to check existence of queue " + response3, asyncResult.cause());
                            promise2.complete();
                            return;
                        }
                        if (((Response) asyncResult.result()).toLong().longValue() == 1) {
                            this.log.debug("Updating queue timestamp for queue '{}'", response3);
                            updateTimestamp(response3, asyncResult -> {
                                if (asyncResult.failed()) {
                                    this.log.warn("Failed to update timestamps for queue '{}'", response3, asyncResult.cause());
                                }
                                if (atomicInteger.decrementAndGet() == 0) {
                                    removeOldQueues(currentTimeMillis).onComplete(asyncResult -> {
                                        if (asyncResult.failed()) {
                                            this.log.warn("TODO error handling", new Exception(asyncResult.cause()));
                                        }
                                        handler.handle((Object) null);
                                    });
                                } else {
                                    handler.handle((Object) null);
                                }
                            });
                            return;
                        }
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("RedisQues remove old queue: {}", response3);
                        }
                        this.dequeueStatistic.remove(response3);
                        if (atomicInteger.decrementAndGet() == 0) {
                            removeOldQueues(currentTimeMillis).onComplete(asyncResult2 -> {
                                if (asyncResult2.failed()) {
                                    this.log.warn("TODO error handling", new Exception(asyncResult2.cause()));
                                }
                                this.queueStatisticsCollector.resetQueueFailureStatistics(response3);
                                promise2.complete();
                            });
                        } else {
                            this.queueStatisticsCollector.resetQueueFailureStatistics(response3);
                            promise2.complete();
                        }
                    });
                }
                CompositeFuture.all(arrayList).onComplete(asyncResult2 -> {
                    if (asyncResult2.failed()) {
                        this.log.warn("Cannot happen", new Exception(asyncResult2.cause()));
                    }
                    promise.complete();
                });
            });
        }).onFailure(th -> {
            this.log.warn("Redis: Failed to checkQueues", th);
            promise.complete();
        });
        return promise.future();
    }

    private Future<Void> removeOldQueues(long j) {
        Promise promise = Promise.promise();
        this.log.debug("Cleaning old queues");
        this.redisProvider.redis().onSuccess(redisAPI -> {
            redisAPI.zremrangebyscore(this.queuesKey, "-inf", String.valueOf(j), asyncResult -> {
                if (asyncResult.failed()) {
                    this.log.warn("TODO error handling", asyncResult.cause());
                }
                promise.complete();
            });
        }).onFailure(th -> {
            this.log.warn("Redis: Failed to removeOldQueues", th);
            promise.complete();
        });
        return promise.future();
    }

    private QueueConfiguration findQueueConfiguration(String str) {
        for (QueueConfiguration queueConfiguration : this.configurationProvider.configuration().getQueueConfigurations()) {
            if (queueConfiguration.compiledPattern().matcher(str).matches()) {
                return queueConfiguration;
            }
        }
        return null;
    }
}
