package org.swisspush.gateleen.hook.reducedpropagation;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.util.HttpRequestHeader;
import org.swisspush.gateleen.core.util.StringUtils;
import org.swisspush.gateleen.queue.queuing.RequestQueue;

/* loaded from: input_file:org/swisspush/gateleen/hook/reducedpropagation/ReducedPropagationManager.class */
public class ReducedPropagationManager {
    private Vertx vertx;
    private final ReducedPropagationStorage storage;
    private final RequestQueue requestQueue;
    public static final String LOCK_REQUESTER = "ReducedPropagationManager";
    public static final String PROCESSOR_ADDRESS = "gateleen.hook-expired-queues-processor";
    public static final String MANAGER_QUEUE_PREFIX = "manager_";
    private long processExpiredQueuesTimerId = -1;
    private Logger log = LoggerFactory.getLogger(ReducedPropagationManager.class);

    public ReducedPropagationManager(Vertx vertx, ReducedPropagationStorage reducedPropagationStorage, RequestQueue requestQueue) {
        this.vertx = vertx;
        this.storage = reducedPropagationStorage;
        this.requestQueue = requestQueue;
        registerExpiredQueueProcessor();
    }

    public void startExpiredQueueProcessing(long j) {
        this.log.info("About to start periodic processing of expired queues with an interval of " + j + " ms");
        this.vertx.cancelTimer(this.processExpiredQueuesTimerId);
        this.processExpiredQueuesTimerId = this.vertx.setPeriodic(j, l -> {
            processExpiredQueues();
        });
    }

    public Future<Void> processIncomingRequest(HttpMethod httpMethod, String str, MultiMap multiMap, Buffer buffer, String str2, long j, Handler<Void> handler) {
        Future<Void> future = Future.future();
        long currentTimeMillis = System.currentTimeMillis() + j;
        this.log.info("Going to perform a lockedEnqueue for (original) queue '" + str2 + "' and eventually starting a new timer");
        this.requestQueue.lockedEnqueue(new HttpRequest(httpMethod, str, multiMap, buffer.getBytes()), str2, LOCK_REQUESTER, handler);
        this.storage.addQueue(str2, currentTimeMillis).setHandler(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("starting a new timer for queue '" + str2 + "' and propagationIntervalMs '" + j + "' failed. Cause: " + asyncResult.cause());
                future.fail(asyncResult.cause());
            } else if (((Boolean) asyncResult.result()).booleanValue()) {
                this.log.info("Timer for queue '" + str2 + "' with expiration at '" + currentTimeMillis + "' started.");
                storeQueueRequest(str2, httpMethod, str, multiMap).setHandler(asyncResult -> {
                    if (asyncResult.failed()) {
                        future.fail(asyncResult.cause());
                    } else {
                        future.complete();
                    }
                });
            } else {
                this.log.info("Timer for queue '" + str2 + "' is already running.");
                future.complete();
            }
        });
        return future;
    }

    private Future<Void> storeQueueRequest(String str, HttpMethod httpMethod, String str2, MultiMap multiMap) {
        this.log.info("Going to write the queue request for queue '" + str + "' to the storage");
        Future<Void> future = Future.future();
        MultiMap addAll = new CaseInsensitiveHeaders().addAll(multiMap);
        if (HttpRequestHeader.containsHeader(addAll, HttpRequestHeader.CONTENT_LENGTH)) {
            addAll.set(HttpRequestHeader.CONTENT_LENGTH.getName(), "0");
        }
        this.storage.storeQueueRequest(str, new HttpRequest(httpMethod, str2, addAll, (byte[]) null).toJsonObject()).setHandler(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("Storing the queue request for queue '" + str + "' failed. Cause: " + asyncResult.cause());
                future.fail(asyncResult.cause());
            } else {
                this.log.info("Successfully stored the queue request for queue '" + str + "'");
                future.complete();
            }
        });
        return future;
    }

    private void processExpiredQueues() {
        this.log.info("Going to process expired queues");
        this.storage.removeExpiredQueues(System.currentTimeMillis()).setHandler(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("Failed to process expired queues. Cause: " + asyncResult.cause());
                return;
            }
            List<String> list = (List) asyncResult.result();
            this.log.info("Got " + list.size() + " expired queues to process");
            for (String str : list) {
                this.log.info("About to notify a consumer to process expired queue '" + str + "'");
                this.vertx.eventBus().send(PROCESSOR_ADDRESS, str, asyncResult -> {
                    if ("ok".equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))) {
                        this.log.info(((JsonObject) ((Message) asyncResult.result()).body()).getString("message"));
                    } else {
                        this.log.error("Failed to process expired queue. Message: " + ((JsonObject) ((Message) asyncResult.result()).body()).getString("message"));
                    }
                });
            }
        });
    }

    private void registerExpiredQueueProcessor() {
        this.vertx.eventBus().consumer(PROCESSOR_ADDRESS, message -> {
            processExpiredQueue((String) message.body(), message);
        });
    }

    private void processExpiredQueue(String str, Message<String> message) {
        if (StringUtils.isEmpty(str)) {
            message.reply(new JsonObject().put("status", "error").put("message", "Tried to process an expired queue without a valid queue name. Going to stop here"));
        } else {
            this.storage.getQueueRequest(str).setHandler(asyncResult -> {
                if (asyncResult.failed()) {
                    message.reply(new JsonObject().put("status", "error").put("message", asyncResult.cause().getMessage()));
                } else if (asyncResult.result() == null) {
                    message.reply(new JsonObject().put("status", "error").put("message", "stored queue request for queue '" + str + "' is null"));
                } else {
                    String str2 = MANAGER_QUEUE_PREFIX + str;
                    this.requestQueue.deleteAllQueueItems(str2, false).setHandler(asyncResult -> {
                        if (asyncResult.failed()) {
                            message.reply(new JsonObject().put("status", "error").put("message", asyncResult.cause().getMessage()));
                            return;
                        }
                        try {
                            this.requestQueue.enqueueFuture(new HttpRequest((JsonObject) asyncResult.result()), str2).setHandler(asyncResult -> {
                                if (asyncResult.failed()) {
                                    message.reply(new JsonObject().put("status", "error").put("message", asyncResult.cause().getMessage()));
                                } else {
                                    this.storage.removeQueueRequest(str).setHandler(asyncResult -> {
                                        if (asyncResult.failed()) {
                                            this.log.error("Failed to remove request for queue '" + str + "'. Remove it manually to remove expired data from storage");
                                        }
                                        this.requestQueue.deleteAllQueueItems(str, true).setHandler(asyncResult -> {
                                            if (asyncResult.succeeded()) {
                                                message.reply(new JsonObject().put("status", "ok").put("message", "Successfully deleted lock and all queue items of queue " + str));
                                            } else {
                                                message.reply(new JsonObject().put("status", "error").put("message", asyncResult.cause().getMessage()));
                                            }
                                        });
                                    });
                                }
                            });
                        } catch (Exception e) {
                            message.reply(new JsonObject().put("status", "error").put("message", e.getMessage()));
                        }
                    });
                }
            });
        }
    }
}
