package org.swisspush.gateleen.queue.queuing;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType;

/* loaded from: input_file:org/swisspush/gateleen/queue/queuing/QueueProcessor.class */
public class QueueProcessor {
    private Vertx vertx;
    private HttpClient httpClient;
    private MonitoringHandler monitoringHandler;
    private QueueCircuitBreaker queueCircuitBreaker;

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler) {
        this(vertx, httpClient, monitoringHandler, null);
    }

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler, final QueueCircuitBreaker queueCircuitBreaker) {
        this.vertx = vertx;
        this.httpClient = httpClient;
        this.monitoringHandler = monitoringHandler;
        this.queueCircuitBreaker = queueCircuitBreaker;
        vertx.eventBus().localConsumer(getQueueProcessorAddress(), new Handler<Message<JsonObject>>() { // from class: org.swisspush.gateleen.queue.queuing.QueueProcessor.1
            public void handle(Message<JsonObject> message) {
                JsonObject jsonObject = new JsonObject(((JsonObject) message.body()).getString("payload"));
                try {
                    HttpRequest httpRequest = new HttpRequest(jsonObject);
                    Logger logger = RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders());
                    if (logger.isTraceEnabled()) {
                        logger.trace("process message: " + message);
                    }
                    String string = ((JsonObject) message.body()).getString("queue");
                    if (QueueProcessor.this.isCircuitCheckEnabled()) {
                        queueCircuitBreaker.handleQueuedRequest(string, httpRequest).setHandler(asyncResult -> {
                            if (asyncResult.failed()) {
                                String str = "Error in QueueCircuitBreaker occurred for queue " + string + ". Reply with status ERROR. Message is: " + asyncResult.cause().getMessage();
                                logger.error(str);
                                message.reply(new JsonObject().put("status", "error").put("message", str));
                            } else {
                                QueueCircuitState queueCircuitState = (QueueCircuitState) asyncResult.result();
                                if (QueueCircuitState.OPEN == queueCircuitState) {
                                    message.reply(new JsonObject().put("status", "error").put("message", "Circuit for queue " + string + " is " + queueCircuitState + ". Queues using this endpoint are not allowed to be executed right now"));
                                } else {
                                    QueueProcessor.this.executeQueuedRequest(message, logger, httpRequest, jsonObject, string, queueCircuitState);
                                }
                            }
                        });
                    } else {
                        QueueProcessor.this.executeQueuedRequest(message, logger, httpRequest, jsonObject, string, null);
                    }
                } catch (Exception e) {
                    LoggerFactory.getLogger(QueueProcessor.class).error("Could not build request: " + ((JsonObject) message.body()).toString() + " error is " + e.getMessage());
                    message.reply(new JsonObject().put("status", "error").put("message", e.getMessage()));
                }
            }
        });
    }

    public String getQueueProcessorAddress() {
        return Address.queueProcessorAddress();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isCircuitCheckEnabled() {
        return this.queueCircuitBreaker != null && this.queueCircuitBreaker.isCircuitCheckEnabled();
    }

    private boolean isStatisticsUpdateEnabled() {
        return this.queueCircuitBreaker != null && this.queueCircuitBreaker.isStatisticsUpdateEnabled();
    }

    private void performCircuitBreakerActions(String str, HttpRequest httpRequest, QueueResponseType queueResponseType, QueueCircuitState queueCircuitState) {
        updateCircuitBreakerStatistics(str, httpRequest, queueResponseType, queueCircuitState);
        if (QueueCircuitState.HALF_OPEN == queueCircuitState) {
            if (QueueResponseType.SUCCESS == queueResponseType) {
                closeCircuit(httpRequest);
            } else if (QueueResponseType.FAILURE == queueResponseType) {
                reOpenCircuit(httpRequest);
            }
        }
    }

    private void updateCircuitBreakerStatistics(String str, HttpRequest httpRequest, QueueResponseType queueResponseType, QueueCircuitState queueCircuitState) {
        if (!isStatisticsUpdateEnabled() || QueueCircuitState.OPEN == queueCircuitState) {
            return;
        }
        this.queueCircuitBreaker.updateStatistics(str, httpRequest, queueResponseType).setHandler(asyncResult -> {
            if (asyncResult.failed()) {
                RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders()).warn("failed to update statistics for queue '" + str + "' to uri " + httpRequest.getUri() + ". Message is: " + asyncResult.cause().getMessage());
            }
        });
    }

    private void closeCircuit(HttpRequest httpRequest) {
        if (this.queueCircuitBreaker != null) {
            this.queueCircuitBreaker.closeCircuit(httpRequest).setHandler(asyncResult -> {
                if (asyncResult.failed()) {
                    RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders()).error("failed to close circuit " + httpRequest.getUri() + ". Message is: " + asyncResult.cause().getMessage());
                }
            });
        }
    }

    private void reOpenCircuit(HttpRequest httpRequest) {
        if (this.queueCircuitBreaker != null) {
            this.queueCircuitBreaker.reOpenCircuit(httpRequest).setHandler(asyncResult -> {
                if (asyncResult.failed()) {
                    RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders()).error("failed to re-open circuit " + httpRequest.getUri() + ". Message is: " + asyncResult.cause().getMessage());
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeQueuedRequest(Message<JsonObject> message, Logger logger, HttpRequest httpRequest, JsonObject jsonObject, String str, QueueCircuitState queueCircuitState) {
        logger.debug("performing request " + httpRequest.getMethod() + " " + httpRequest.getUri());
        if (ExpiryCheckHandler.isExpired(httpRequest.getHeaders(), jsonObject.getLong(QueueClient.QUEUE_TIMESTAMP))) {
            logger.debug("request expired to " + httpRequest.getUri());
            message.reply(new JsonObject().put("status", "ok"));
            return;
        }
        HttpClientRequest request = this.httpClient.request(httpRequest.getMethod(), httpRequest.getUri(), httpClientResponse -> {
            if (logger.isTraceEnabled()) {
                logger.trace("response: " + httpClientResponse.statusCode());
            }
            if ((httpClientResponse.statusCode() < 200 || httpClientResponse.statusCode() >= 300) && httpClientResponse.statusCode() != 409) {
                logger.info("Failed queued request to " + httpRequest.getUri() + ": " + httpClientResponse.statusCode() + " " + httpClientResponse.statusMessage());
                message.reply(new JsonObject().put("status", "error").put("message", httpClientResponse.statusCode() + " " + httpClientResponse.statusMessage()));
                performCircuitBreakerActions(str, httpRequest, QueueResponseType.FAILURE, queueCircuitState);
            } else {
                if (httpClientResponse.statusCode() != StatusCode.CONFLICT.getStatusCode()) {
                    logger.debug("Successful request to " + httpRequest.getUri());
                } else {
                    logger.warn("Ignoring request conflict to " + httpRequest.getUri() + ": " + httpClientResponse.statusCode() + " " + httpClientResponse.statusMessage());
                }
                message.reply(new JsonObject().put("status", "ok"));
                performCircuitBreakerActions(str, httpRequest, QueueResponseType.SUCCESS, queueCircuitState);
                this.monitoringHandler.updateDequeue();
            }
            httpClientResponse.bodyHandler(buffer -> {
                logger.debug("Discarding backend body");
            });
            httpClientResponse.endHandler(r4 -> {
                logger.debug("Backend response end");
            });
            httpClientResponse.exceptionHandler(th -> {
                logger.warn("Exception on response from " + httpRequest.getUri() + ": " + th.getMessage());
                message.reply(new JsonObject().put("status", "error").put("message", th.getMessage()));
                performCircuitBreakerActions(str, httpRequest, QueueResponseType.FAILURE, queueCircuitState);
            });
        });
        if (httpRequest.getHeaders() != null && !httpRequest.getHeaders().isEmpty()) {
            request.headers().setAll(httpRequest.getHeaders());
        }
        request.exceptionHandler(th -> {
            logger.warn("Failed request to " + httpRequest.getUri() + ": " + th.getMessage());
            message.reply(new JsonObject().put("status", "error").put("message", th.getMessage()));
            performCircuitBreakerActions(str, httpRequest, QueueResponseType.FAILURE, queueCircuitState);
        });
        request.setTimeout(120000L);
        if (httpRequest.getPayload() != null) {
            request.end(Buffer.buffer(httpRequest.getPayload()));
        } else {
            request.end();
        }
    }

    public HttpClient getHttpClient() {
        return this.httpClient;
    }
}
