package org.swisspush.gateleen.queue.queuing;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;

/* loaded from: input_file:org/swisspush/gateleen/queue/queuing/QueueClient.class */
public class QueueClient implements RequestQueue {
    public static final String QUEUE_TIMESTAMP = "queueTimestamp";
    public static final Logger log = LoggerFactory.getLogger(QueueClient.class);
    private MonitoringHandler monitoringHandler;
    private Vertx vertx;

    public QueueClient(Vertx vertx, MonitoringHandler monitoringHandler) {
        this.vertx = vertx;
        this.monitoringHandler = monitoringHandler;
    }

    protected String getRedisquesAddress() {
        return Address.redisquesAddress();
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public void enqueue(HttpServerRequest httpServerRequest, Buffer buffer, String str) {
        enqueue(httpServerRequest, httpServerRequest.headers(), buffer, str);
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public void enqueue(HttpServerRequest httpServerRequest, MultiMap multiMap, Buffer buffer, String str) {
        enqueue(httpServerRequest, new HttpRequest(httpServerRequest.method(), httpServerRequest.uri(), multiMap, buffer.getBytes()), str);
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public void enqueue(HttpRequest httpRequest, String str) {
        enqueue((HttpServerRequest) null, httpRequest, str);
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public void enqueue(HttpRequest httpRequest, String str, Handler<Void> handler) {
        enqueue((HttpServerRequest) null, httpRequest, str, handler);
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public void lockedEnqueue(HttpRequest httpRequest, String str, String str2, Handler<Void> handler) {
        this.vertx.eventBus().request(getRedisquesAddress(), org.swisspush.redisques.util.RedisquesAPI.buildLockedEnqueueOperation(str, httpRequest.toJsonObject().put(QUEUE_TIMESTAMP, Long.valueOf(System.currentTimeMillis())).encode(), str2), asyncResult -> {
            if ("ok".equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))) {
                this.monitoringHandler.updateLastUsedQueueSizeInformation(str);
                this.monitoringHandler.updateEnqueue();
            }
            if (handler != null) {
                handler.handle((Object) null);
            }
        });
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public Future<Void> deleteLock(String str) {
        Promise promise = Promise.promise();
        this.vertx.eventBus().request(getRedisquesAddress(), org.swisspush.redisques.util.RedisquesAPI.buildDeleteLockOperation(str), asyncResult -> {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause());
            } else if ("ok".equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))) {
                promise.complete();
            } else {
                promise.fail("Failed to delete lock for queue " + str);
            }
        });
        return promise.future();
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public Future<Void> deleteAllQueueItems(String str, boolean z) {
        Promise promise = Promise.promise();
        this.vertx.eventBus().request(getRedisquesAddress(), org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllQueueItemsOperation(str, z), asyncResult -> {
            if (asyncResult.succeeded()) {
                promise.complete();
            } else {
                promise.fail("Failed to delete all queue items for queue " + str + " with unlock " + z + ". Cause: " + asyncResult.cause());
            }
        });
        return promise.future();
    }

    @Override // org.swisspush.gateleen.queue.queuing.RequestQueue
    public Future<Void> enqueueFuture(HttpRequest httpRequest, String str) {
        Promise promise = Promise.promise();
        this.vertx.eventBus().request(getRedisquesAddress(), org.swisspush.redisques.util.RedisquesAPI.buildEnqueueOperation(str, httpRequest.toJsonObject().put(QUEUE_TIMESTAMP, Long.valueOf(System.currentTimeMillis())).encode()), asyncResult -> {
            if (!"ok".equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))) {
                promise.fail(((JsonObject) ((Message) asyncResult.result()).body()).getString("message"));
                return;
            }
            this.monitoringHandler.updateLastUsedQueueSizeInformation(str);
            this.monitoringHandler.updateEnqueue();
            promise.complete();
        });
        return promise.future();
    }

    private void enqueue(HttpServerRequest httpServerRequest, HttpRequest httpRequest, String str) {
        enqueue(httpServerRequest, httpRequest, str, (Handler<Void>) null);
    }

    private void enqueue(HttpServerRequest httpServerRequest, HttpRequest httpRequest, String str, Handler<Void> handler) {
        if (QueueProcessor.httpMethodIsQueueable(httpRequest.getMethod())) {
            this.vertx.eventBus().request(getRedisquesAddress(), org.swisspush.redisques.util.RedisquesAPI.buildEnqueueOperation(str, httpRequest.toJsonObject().put(QUEUE_TIMESTAMP, Long.valueOf(System.currentTimeMillis())).encode()), asyncResult -> {
                if ("ok".equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))) {
                    this.monitoringHandler.updateLastUsedQueueSizeInformation(str);
                    this.monitoringHandler.updateEnqueue();
                    if (httpServerRequest != null) {
                        ResponseStatusCodeLogUtil.info(httpServerRequest, StatusCode.ACCEPTED, QueueClient.class);
                        httpServerRequest.response().setStatusCode(StatusCode.ACCEPTED.getStatusCode());
                        httpServerRequest.response().setStatusMessage(StatusCode.ACCEPTED.getStatusMessage());
                        httpServerRequest.response().end();
                    }
                } else if (httpServerRequest != null) {
                    ResponseStatusCodeLogUtil.info(httpServerRequest, StatusCode.INTERNAL_SERVER_ERROR, QueueClient.class);
                    httpServerRequest.response().setStatusCode(StatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
                    httpServerRequest.response().setStatusMessage(StatusCode.INTERNAL_SERVER_ERROR.getStatusMessage());
                    httpServerRequest.response().end(((JsonObject) ((Message) asyncResult.result()).body()).getString("message"));
                }
                if (handler != null) {
                    handler.handle((Object) null);
                }
            });
            return;
        }
        log.warn("Ignore enqueue of unsupported HTTP method in '{} {}'.", httpRequest.getMethod(), httpRequest.getUri());
        if (handler != null) {
            handler.handle((Object) null);
        }
    }
}
