package org.swisspush.gateleen.queue.queuing;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.StatusCode;
import org.swisspush.gateleen.monitoring.MonitoringHandler;
import org.swisspush.gateleen.queue.expiry.ExpiryCheckHandler;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType;

/* loaded from: input_file:org/swisspush/gateleen/queue/queuing/QueueProcessor.class */
public class QueueProcessor {
    private Vertx vertx;
    private HttpClient httpClient;
    private MonitoringHandler monitoringHandler;
    private QueueCircuitBreaker queueCircuitBreaker;
    private static final Handler<Buffer> DEV_NULL = buffer -> {
    };
    private MessageConsumer<JsonObject> consumer;
    private Logger log;

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler) {
        this(vertx, httpClient, monitoringHandler, null);
    }

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler, QueueCircuitBreaker queueCircuitBreaker) {
        this(vertx, httpClient, monitoringHandler, queueCircuitBreaker, true);
    }

    public QueueProcessor(Vertx vertx, HttpClient httpClient, MonitoringHandler monitoringHandler, QueueCircuitBreaker queueCircuitBreaker, boolean z) {
        this.log = LoggerFactory.getLogger(QueueProcessor.class);
        this.vertx = vertx;
        this.httpClient = httpClient;
        this.monitoringHandler = monitoringHandler;
        this.queueCircuitBreaker = queueCircuitBreaker;
        if (z) {
            startQueueProcessing();
        } else {
            this.log.info("initialized QueueProcessor but queue processing has disabled");
        }
    }

    public void startQueueProcessing() {
        if (this.consumer != null && this.consumer.isRegistered()) {
            this.log.info("queue processing is already started");
            return;
        }
        this.log.info("about to register consumer to start queue processing");
        this.consumer = this.vertx.eventBus().consumer(getQueueProcessorAddress(), message -> {
            JsonObject jsonObject = new JsonObject(((JsonObject) message.body()).getString("payload"));
            try {
                HttpRequest httpRequest = new HttpRequest(jsonObject);
                Logger logger = RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders());
                if (logger.isTraceEnabled()) {
                    logger.trace("process message: " + message);
                }
                String string = ((JsonObject) message.body()).getString("queue");
                if (isCircuitCheckEnabled()) {
                    this.queueCircuitBreaker.handleQueuedRequest(string, httpRequest).onComplete(asyncResult -> {
                        if (asyncResult.failed()) {
                            String str = "Error in QueueCircuitBreaker occurred for queue " + string + ". Reply with status ERROR. Message is: " + asyncResult.cause().getMessage();
                            logger.error(str);
                            message.reply(new JsonObject().put("status", "error").put("message", str));
                        } else {
                            QueueCircuitState queueCircuitState = (QueueCircuitState) asyncResult.result();
                            if (QueueCircuitState.OPEN == queueCircuitState) {
                                message.reply(new JsonObject().put("status", "error").put("message", "Circuit for queue " + string + " is " + queueCircuitState + ". Queues using this endpoint are not allowed to be executed right now"));
                            } else {
                                executeQueuedRequest(message, logger, httpRequest, jsonObject, string, queueCircuitState);
                            }
                        }
                    });
                } else {
                    executeQueuedRequest(message, logger, httpRequest, jsonObject, string, null);
                }
            } catch (Exception e) {
                this.log.error("Could not build request: {} error is {}", ((JsonObject) message.body()).toString(), e.getMessage());
                message.reply(new JsonObject().put("status", "error").put("message", e.getMessage()));
            }
        });
        this.log.info("registered queue processing consumer on address: {}", this.consumer.address());
    }

    public void stopQueueProcessing() {
        if (this.consumer == null || !this.consumer.isRegistered()) {
            this.log.info("queue processing is already stopped");
        } else {
            this.log.info("about to unregister consumer to stop queue processing");
            this.consumer.unregister();
        }
    }

    public boolean isQueueProcessingStarted() {
        return this.consumer != null && this.consumer.isRegistered();
    }

    public String getQueueProcessorAddress() {
        return Address.queueProcessorAddress();
    }

    private boolean isCircuitCheckEnabled() {
        return this.queueCircuitBreaker != null && this.queueCircuitBreaker.isCircuitCheckEnabled();
    }

    private boolean isStatisticsUpdateEnabled() {
        return this.queueCircuitBreaker != null && this.queueCircuitBreaker.isStatisticsUpdateEnabled();
    }

    public static boolean httpMethodIsQueueable(HttpMethod httpMethod) {
        boolean z;
        String name = httpMethod.name();
        boolean z2 = -1;
        switch (name.hashCode()) {
            case -531492226:
                if (name.equals("OPTIONS")) {
                    z2 = 5;
                    break;
                }
                break;
            case 70454:
                if (name.equals("GET")) {
                    z2 = false;
                    break;
                }
                break;
            case 79599:
                if (name.equals("PUT")) {
                    z2 = 2;
                    break;
                }
                break;
            case 2213344:
                if (name.equals("HEAD")) {
                    z2 = true;
                    break;
                }
                break;
            case 2461856:
                if (name.equals("POST")) {
                    z2 = 3;
                    break;
                }
                break;
            case 75900968:
                if (name.equals("PATCH")) {
                    z2 = 6;
                    break;
                }
                break;
            case 2012838315:
                if (name.equals("DELETE")) {
                    z2 = 4;
                    break;
                }
                break;
        }
        switch (z2) {
            case false:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
                z = true;
                break;
            default:
                z = false;
                break;
        }
        return z;
    }

    private void performCircuitBreakerActions(String str, HttpRequest httpRequest, QueueResponseType queueResponseType, QueueCircuitState queueCircuitState) {
        updateCircuitBreakerStatistics(str, httpRequest, queueResponseType, queueCircuitState);
        if (QueueCircuitState.HALF_OPEN == queueCircuitState) {
            if (QueueResponseType.SUCCESS == queueResponseType) {
                closeCircuit(httpRequest);
            } else if (QueueResponseType.FAILURE == queueResponseType) {
                reOpenCircuit(httpRequest);
            }
        }
    }

    private void updateCircuitBreakerStatistics(String str, HttpRequest httpRequest, QueueResponseType queueResponseType, QueueCircuitState queueCircuitState) {
        if (!isStatisticsUpdateEnabled() || QueueCircuitState.OPEN == queueCircuitState) {
            return;
        }
        this.queueCircuitBreaker.updateStatistics(str, httpRequest, queueResponseType).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders()).warn("failed to update statistics for queue '" + str + "' to uri " + httpRequest.getUri() + ". Message is: " + asyncResult.cause().getMessage());
            }
        });
    }

    private void closeCircuit(HttpRequest httpRequest) {
        if (this.queueCircuitBreaker != null) {
            this.queueCircuitBreaker.closeCircuit(httpRequest).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders()).error("failed to close circuit " + httpRequest.getUri() + ". Message is: " + asyncResult.cause().getMessage());
                }
            });
        }
    }

    private void reOpenCircuit(HttpRequest httpRequest) {
        if (this.queueCircuitBreaker != null) {
            this.queueCircuitBreaker.reOpenCircuit(httpRequest).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    RequestLoggerFactory.getLogger(QueueProcessor.class, httpRequest.getHeaders()).error("failed to re-open circuit " + httpRequest.getUri() + ". Message is: " + asyncResult.cause().getMessage());
                }
            });
        }
    }

    private void executeQueuedRequest(Message<JsonObject> message, Logger logger, HttpRequest httpRequest, JsonObject jsonObject, String str, QueueCircuitState queueCircuitState) {
        logger.debug("performing request " + httpRequest.getMethod() + " " + httpRequest.getUri());
        if (!ExpiryCheckHandler.isExpired(httpRequest.getHeaders(), jsonObject.getLong(QueueClient.QUEUE_TIMESTAMP))) {
            this.httpClient.request(httpRequest.getMethod(), httpRequest.getUri()).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    logger.warn("Failed request to {}: {}", httpRequest.getUri(), asyncResult.cause());
                    return;
                }
                HttpClientRequest httpClientRequest = (HttpClientRequest) asyncResult.result();
                if (httpRequest.getHeaders() != null && !httpRequest.getHeaders().isEmpty()) {
                    httpClientRequest.headers().setAll(httpRequest.getHeaders());
                }
                httpClientRequest.exceptionHandler(th -> {
                    logger.warn("Failed request to {}: {}", httpRequest.getUri(), asyncResult.cause());
                    message.reply(new JsonObject().put("status", "error").put("message", th.getMessage()));
                    performCircuitBreakerActions(str, httpRequest, QueueResponseType.FAILURE, queueCircuitState);
                });
                Handler handler = asyncResult -> {
                    if (asyncResult.failed()) {
                        logger.error("TODO error handling", new Exception("stacktrace", asyncResult.cause()));
                        return;
                    }
                    HttpClientResponse httpClientResponse = (HttpClientResponse) asyncResult.result();
                    int statusCode = httpClientResponse.statusCode();
                    logger.trace("response: {}", Integer.valueOf(statusCode));
                    if ((statusCode >= 200 && statusCode < 300) || statusCode == 409) {
                        if (statusCode != StatusCode.CONFLICT.getStatusCode()) {
                            logger.debug("Successful request to {}", httpRequest.getUri());
                        } else {
                            logger.warn("Ignoring request conflict to {}: {} {}", new Object[]{httpRequest.getUri(), Integer.valueOf(statusCode), httpClientResponse.statusMessage()});
                        }
                        message.reply(new JsonObject().put("status", "ok"));
                        performCircuitBreakerActions(str, httpRequest, QueueResponseType.SUCCESS, queueCircuitState);
                        this.monitoringHandler.updateDequeue();
                    } else if (QueueRetryUtil.retryQueueItem(httpRequest.getHeaders(), statusCode, logger)) {
                        logger.info("Failed queued request to {}: {} {}", new Object[]{httpRequest.getUri(), Integer.valueOf(statusCode), httpClientResponse.statusMessage()});
                        message.reply(new JsonObject().put("status", "error").put("message", statusCode + " " + httpClientResponse.statusMessage()));
                        performCircuitBreakerActions(str, httpRequest, QueueResponseType.FAILURE, queueCircuitState);
                    } else {
                        logger.info("Reply success, because no more retries left for failed queued request to {}: {} {}", new Object[]{httpRequest.getUri(), Integer.valueOf(statusCode), httpClientResponse.statusMessage()});
                        message.reply(new JsonObject().put("status", "ok"));
                        performCircuitBreakerActions(str, httpRequest, QueueResponseType.SUCCESS, queueCircuitState);
                    }
                    httpClientResponse.handler(DEV_NULL);
                    httpClientResponse.endHandler(r4 -> {
                        logger.debug("Backend response end");
                    });
                    httpClientResponse.exceptionHandler(th2 -> {
                        logger.warn("Exception on response from {}: {}", httpRequest.getUri(), th2.getMessage());
                        message.reply(new JsonObject().put("status", "error").put("message", th2.getMessage()));
                        performCircuitBreakerActions(str, httpRequest, QueueResponseType.FAILURE, queueCircuitState);
                    });
                };
                httpClientRequest.idleTimeout(120000L);
                if (httpRequest.getPayload() != null) {
                    httpClientRequest.send(Buffer.buffer(httpRequest.getPayload()), handler);
                } else {
                    httpClientRequest.send(handler);
                }
            });
        } else {
            logger.info("request expired to " + httpRequest.getUri());
            message.reply(new JsonObject().put("status", "ok"));
        }
    }

    public HttpClient getHttpClient() {
        return this.httpClient;
    }
}
