package org.swisspush.gateleen.queue.queuing.circuitbreaker.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.core.http.HttpRequest;
import org.swisspush.gateleen.core.lock.Lock;
import org.swisspush.gateleen.core.refresh.Refreshable;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.LockUtil;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreakerStorage;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.configuration.QueueCircuitBreakerConfigurationResource;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.configuration.QueueCircuitBreakerConfigurationResourceManager;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.PatternAndCircuitHash;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitBreakerRulePatternToCircuitMapping;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.UpdateStatisticsResult;
import org.swisspush.gateleen.routing.Rule;
import org.swisspush.gateleen.routing.RuleProvider;
import org.swisspush.redisques.util.RedisquesAPI;

/* loaded from: input_file:org/swisspush/gateleen/queue/queuing/circuitbreaker/impl/QueueCircuitBreakerImpl.class */
public class QueueCircuitBreakerImpl implements QueueCircuitBreaker, RuleProvider.RuleChangesObserver, Refreshable {
    private final Vertx vertx;
    private final QueueCircuitBreakerStorage queueCircuitBreakerStorage;
    private final QueueCircuitBreakerRulePatternToCircuitMapping ruleToCircuitMapping;
    private final QueueCircuitBreakerConfigurationResourceManager configResourceManager;
    private final Lock lock;
    private final LockUtil lockUtil;
    public static final String OPEN_TO_HALF_OPEN_TASK_LOCK = "openToHalfOpenTask";
    public static final String UNLOCK_QUEUES_TASK_LOCK = "unlockQueuesTask";
    public static final String UNLOCK_SAMPLE_QUEUES_TASK_LOCK = "unlockSampleQueuesTask";
    private final String redisquesAddress;
    private final Logger log = LoggerFactory.getLogger(QueueCircuitBreakerImpl.class);
    private long openToHalfOpenTimerId = -1;
    private long unlockQueuesTimerId = -1;
    private long unlockSampleQueuesTimerId = -1;

    public QueueCircuitBreakerImpl(Vertx vertx, Lock lock, String str, QueueCircuitBreakerStorage queueCircuitBreakerStorage, RuleProvider ruleProvider, GateleenExceptionFactory gateleenExceptionFactory, QueueCircuitBreakerRulePatternToCircuitMapping queueCircuitBreakerRulePatternToCircuitMapping, QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager, Handler<HttpServerRequest> handler, int i) {
        this.vertx = vertx;
        this.lock = lock;
        this.lockUtil = new LockUtil(gateleenExceptionFactory);
        this.redisquesAddress = str;
        this.queueCircuitBreakerStorage = queueCircuitBreakerStorage;
        ruleProvider.registerObserver(this);
        this.ruleToCircuitMapping = queueCircuitBreakerRulePatternToCircuitMapping;
        this.configResourceManager = queueCircuitBreakerConfigurationResourceManager;
        this.configResourceManager.addRefreshable(this);
        registerPeriodicTasks();
        vertx.createHttpServer(new HttpServerOptions().setHandle100ContinueAutomatically(true)).requestHandler(handler).listen(i, asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("Successfully listening to port {}", Integer.valueOf(i));
            } else {
                this.log.error("Unable to listen to port {}. Cannot handle QueueCircuitBreaker http requests", Integer.valueOf(i));
            }
        });
    }

    private void registerPeriodicTasks() {
        registerOpenToHalfOpenTask();
        registerUnlockQueuesTask();
        registerUnlockSampleQueuesTask();
    }

    private String createToken(String str) {
        String instanceAddress = Address.instanceAddress();
        return instanceAddress + "_" + System.currentTimeMillis() + "_" + instanceAddress;
    }

    private long getLockExpiry(int i) {
        if (i <= 1) {
            return 1L;
        }
        return i / 2;
    }

    private void registerOpenToHalfOpenTask() {
        boolean isOpenToHalfOpenTaskEnabled = getConfig().isOpenToHalfOpenTaskEnabled();
        int openToHalfOpenTaskInterval = getConfig().getOpenToHalfOpenTaskInterval();
        this.vertx.cancelTimer(this.openToHalfOpenTimerId);
        if (!isOpenToHalfOpenTaskEnabled) {
            this.log.info("Not going to register periodic open to half-open task execution");
        } else {
            this.log.info("About to register periodic open to half-open task execution every {}ms", Integer.valueOf(openToHalfOpenTaskInterval));
            this.openToHalfOpenTimerId = this.vertx.setPeriodic(openToHalfOpenTaskInterval, l -> {
                String createToken = createToken(OPEN_TO_HALF_OPEN_TASK_LOCK);
                LockUtil.acquireLock(this.lock, OPEN_TO_HALF_OPEN_TASK_LOCK, createToken, getLockExpiry(openToHalfOpenTaskInterval), this.log).onComplete(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        this.log.error("Could not acquire lock '{}'. Message: {}", OPEN_TO_HALF_OPEN_TASK_LOCK, asyncResult.cause().getMessage());
                    } else if (((Boolean) asyncResult.result()).booleanValue()) {
                        setOpenCircuitsToHalfOpen().onComplete(asyncResult -> {
                            if (!asyncResult.succeeded()) {
                                this.log.error(asyncResult.cause().getMessage());
                                this.lockUtil.releaseLock(this.lock, OPEN_TO_HALF_OPEN_TASK_LOCK, createToken, this.log);
                            } else if (((Long) asyncResult.result()).longValue() > 0) {
                                this.log.info("Successfully changed {} circuits from state open to state half-open", asyncResult.result());
                            } else {
                                this.log.debug("No open circuits to change state to half-open");
                            }
                        });
                    }
                });
            });
        }
    }

    private void registerUnlockQueuesTask() {
        boolean isUnlockQueuesTaskEnabled = getConfig().isUnlockQueuesTaskEnabled();
        int unlockQueuesTaskInterval = getConfig().getUnlockQueuesTaskInterval();
        this.vertx.cancelTimer(this.unlockQueuesTimerId);
        if (!isUnlockQueuesTaskEnabled) {
            this.log.info("Not going to register periodic queues unlock task execution");
        } else {
            this.log.info("About to register periodic queues unlock task execution every {}ms", Integer.valueOf(unlockQueuesTaskInterval));
            this.unlockQueuesTimerId = this.vertx.setPeriodic(unlockQueuesTaskInterval, l -> {
                String createToken = createToken(UNLOCK_QUEUES_TASK_LOCK);
                LockUtil.acquireLock(this.lock, UNLOCK_QUEUES_TASK_LOCK, createToken, getLockExpiry(unlockQueuesTaskInterval), this.log).onComplete(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        this.log.error("Could not acquire lock '{}'. Message: {}", UNLOCK_QUEUES_TASK_LOCK, asyncResult.cause().getMessage());
                    } else if (((Boolean) asyncResult.result()).booleanValue()) {
                        unlockNextQueue().onComplete(asyncResult -> {
                            if (!asyncResult.succeeded()) {
                                this.log.error("Unable to unlock queue '{}'", asyncResult.cause().getMessage());
                                this.lockUtil.releaseLock(this.lock, UNLOCK_QUEUES_TASK_LOCK, createToken, this.log);
                            } else if (asyncResult.result() == null) {
                                this.log.debug("No locked queues to unlock");
                            } else {
                                this.log.debug("Successfully unlocked queue '{}'", asyncResult.result());
                            }
                        });
                    }
                });
            });
        }
    }

    private void registerUnlockSampleQueuesTask() {
        boolean isUnlockSampleQueuesTaskEnabled = getConfig().isUnlockSampleQueuesTaskEnabled();
        int unlockSampleQueuesTaskInterval = getConfig().getUnlockSampleQueuesTaskInterval();
        this.vertx.cancelTimer(this.unlockSampleQueuesTimerId);
        if (!isUnlockSampleQueuesTaskEnabled) {
            this.log.info("Not going to register periodic unlock sample queues task execution");
        } else {
            this.log.info("About to register periodic unlock sample queues task execution every {}ms", Integer.valueOf(unlockSampleQueuesTaskInterval));
            this.unlockSampleQueuesTimerId = this.vertx.setPeriodic(unlockSampleQueuesTaskInterval, l -> {
                String createToken = createToken(UNLOCK_SAMPLE_QUEUES_TASK_LOCK);
                LockUtil.acquireLock(this.lock, UNLOCK_SAMPLE_QUEUES_TASK_LOCK, createToken, getLockExpiry(unlockSampleQueuesTaskInterval), this.log).onComplete(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        this.log.error("Could not acquire lock '{}'. Message: {}", UNLOCK_SAMPLE_QUEUES_TASK_LOCK, asyncResult.cause().getMessage());
                    } else if (((Boolean) asyncResult.result()).booleanValue()) {
                        unlockSampleQueues().onComplete(asyncResult -> {
                            if (!asyncResult.succeeded()) {
                                this.log.error(asyncResult.cause().getMessage());
                                this.lockUtil.releaseLock(this.lock, UNLOCK_SAMPLE_QUEUES_TASK_LOCK, createToken, this.log);
                            } else if (((Long) asyncResult.result()).longValue() == 0) {
                                this.log.debug("No sample queues to unlock");
                            } else {
                                this.log.info("Successfully unlocked {} sample queues", asyncResult.result());
                            }
                        });
                    }
                });
            });
        }
    }

    public void rulesChanged(List<Rule> list) {
        this.log.info("rules have changed, renew rule to circuit mapping");
        List<PatternAndCircuitHash> updateRulePatternToCircuitMapping = this.ruleToCircuitMapping.updateRulePatternToCircuitMapping(list);
        this.log.info("{} mappings have been removed with the update", Integer.valueOf(updateRulePatternToCircuitMapping.size()));
        updateRulePatternToCircuitMapping.forEach(this::closeAndRemoveCircuit);
    }

    public void refresh() {
        this.log.info("Circuit breaker configuration values have changed. Check periodic tasks");
        registerPeriodicTasks();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public boolean isCircuitCheckEnabled() {
        return this.configResourceManager.getConfigurationResource().isCircuitCheckEnabled();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public boolean isStatisticsUpdateEnabled() {
        return this.configResourceManager.getConfigurationResource().isStatisticsUpdateEnabled();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<QueueCircuitState> handleQueuedRequest(String str, HttpRequest httpRequest) {
        Promise promise = Promise.promise();
        PatternAndCircuitHash patternAndCircuitHashFromRequest = getPatternAndCircuitHashFromRequest(httpRequest);
        if (patternAndCircuitHashFromRequest != null) {
            this.queueCircuitBreakerStorage.getQueueCircuitState(patternAndCircuitHashFromRequest).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    promise.fail(asyncResult.cause());
                    return;
                }
                promise.complete((QueueCircuitState) asyncResult.result());
                if (QueueCircuitState.OPEN == asyncResult.result()) {
                    lockQueueSync(str, httpRequest);
                }
            });
        } else {
            failWithNoRuleToCircuitMappingMessage(promise, str, httpRequest);
        }
        return promise.future();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<Void> updateStatistics(String str, HttpRequest httpRequest, QueueResponseType queueResponseType) {
        Promise promise = Promise.promise();
        String requestUniqueId = getRequestUniqueId(httpRequest);
        long currentTimeMillis = System.currentTimeMillis();
        PatternAndCircuitHash patternAndCircuitHashFromRequest = getPatternAndCircuitHashFromRequest(httpRequest);
        if (patternAndCircuitHashFromRequest != null) {
            this.queueCircuitBreakerStorage.updateStatistics(patternAndCircuitHashFromRequest, requestUniqueId, currentTimeMillis, getConfig().getErrorThresholdPercentage(), getConfig().getEntriesMaxAgeMS(), getConfig().getMinQueueSampleCount(), getConfig().getMaxQueueSampleCount(), queueResponseType).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    promise.fail(asyncResult.cause());
                    return;
                }
                if (UpdateStatisticsResult.OPENED == asyncResult.result()) {
                    this.log.warn("circuit '{}' has been opened", patternAndCircuitHashFromRequest.getPattern().pattern());
                    lockQueueSync(str, httpRequest);
                }
                promise.complete();
            });
        } else {
            failWithNoRuleToCircuitMappingMessage(promise, str, httpRequest);
        }
        return promise.future();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<Void> closeCircuit(HttpRequest httpRequest) {
        Promise promise = Promise.promise();
        PatternAndCircuitHash patternAndCircuitHashFromRequest = getPatternAndCircuitHashFromRequest(httpRequest);
        if (patternAndCircuitHashFromRequest != null) {
            this.log.info("About to close circuit {}", patternAndCircuitHashFromRequest.getPattern().pattern());
            this.queueCircuitBreakerStorage.closeCircuit(patternAndCircuitHashFromRequest).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    promise.fail(asyncResult.cause());
                } else {
                    this.log.debug("circuit '{}' has been closed", patternAndCircuitHashFromRequest.getPattern().pattern());
                    promise.complete();
                }
            });
        } else {
            failWithNoRuleToCircuitMappingMessage(promise, null, httpRequest);
        }
        return promise.future();
    }

    private void closeAndRemoveCircuit(PatternAndCircuitHash patternAndCircuitHash) {
        this.log.info("circuit {} has been removed. Closing corresponding circuit", patternAndCircuitHash.getPattern().pattern());
        this.queueCircuitBreakerStorage.closeAndRemoveCircuit(patternAndCircuitHash).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.error("failed to close circuit {}", patternAndCircuitHash.getPattern().pattern());
            }
        });
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<Void> closeAllCircuits() {
        this.log.info("About to close all circuits");
        return this.queueCircuitBreakerStorage.closeAllCircuits();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<Void> reOpenCircuit(HttpRequest httpRequest) {
        Promise promise = Promise.promise();
        PatternAndCircuitHash patternAndCircuitHashFromRequest = getPatternAndCircuitHashFromRequest(httpRequest);
        if (patternAndCircuitHashFromRequest != null) {
            this.log.info("About to reopen circuit {}", patternAndCircuitHashFromRequest.getPattern().pattern());
            this.queueCircuitBreakerStorage.reOpenCircuit(patternAndCircuitHashFromRequest).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    promise.fail(asyncResult.cause());
                } else {
                    this.log.info("circuit '{}' has been reopened", patternAndCircuitHashFromRequest.getPattern().pattern());
                    promise.complete();
                }
            });
        } else {
            failWithNoRuleToCircuitMappingMessage(promise, null, httpRequest);
        }
        return promise.future();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<Void> lockQueue(String str, HttpRequest httpRequest) {
        Promise promise = Promise.promise();
        PatternAndCircuitHash patternAndCircuitHashFromRequest = getPatternAndCircuitHashFromRequest(httpRequest);
        if (patternAndCircuitHashFromRequest != null) {
            this.queueCircuitBreakerStorage.lockQueue(str, patternAndCircuitHashFromRequest).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    promise.fail(asyncResult.cause());
                } else {
                    this.vertx.eventBus().request(this.redisquesAddress, RedisquesAPI.buildPutLockOperation(str, "queue_circuit_breaker"), asyncResult -> {
                        if (asyncResult.failed()) {
                            promise.fail(asyncResult.cause());
                        } else if (!"ok".equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))) {
                            promise.fail("failed to lock queue '" + str + "'. Queue should have been locked, because the circuit '" + patternAndCircuitHashFromRequest.getPattern().pattern() + "' is open");
                        } else {
                            this.log.info("locked queue '{}' because the circuit '{}' is open", str, patternAndCircuitHashFromRequest.getPattern().pattern());
                            promise.complete();
                        }
                    });
                }
            });
        } else {
            failWithNoRuleToCircuitMappingMessage(promise, str, httpRequest);
        }
        return promise.future();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<String> unlockNextQueue() {
        this.log.debug("About to unlock the next queue");
        Promise promise = Promise.promise();
        this.queueCircuitBreakerStorage.popQueueToUnlock().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause().getMessage());
                return;
            }
            String str = (String) asyncResult.result();
            if (str != null) {
                unlockQueue(str).onComplete(asyncResult -> {
                    if (asyncResult.failed()) {
                        promise.fail(asyncResult.cause().getMessage());
                    } else {
                        promise.complete((String) asyncResult.result());
                    }
                });
            } else {
                promise.complete((Object) null);
            }
        });
        return promise.future();
    }

    private void logQueueUnlockError(String str, String str2) {
        this.log.error("Error during unlock of queue '{}'. This queue has been removed from database but not from redisques. This queue must be unlocked manually! Message: {}", str, str2);
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<Long> setOpenCircuitsToHalfOpen() {
        return this.queueCircuitBreakerStorage.setOpenCircuitsToHalfOpen();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<Long> unlockSampleQueues() {
        this.log.debug("About to unlock a sample queue for each circuit");
        Promise promise = Promise.promise();
        this.queueCircuitBreakerStorage.unlockSampleQueues().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause().getMessage());
                return;
            }
            Response<Response> response = (Response) asyncResult.result();
            if (response == null || response.size() == 0) {
                promise.complete(0L);
                return;
            }
            AtomicInteger atomicInteger = new AtomicInteger(response.size());
            ArrayList arrayList = new ArrayList();
            for (Response response2 : response) {
                this.log.info("About to unlock sample queue '{}'", response2);
                unlockQueue(response2.toString()).onComplete(asyncResult -> {
                    atomicInteger.decrementAndGet();
                    if (asyncResult.failed()) {
                        arrayList.add(asyncResult.cause().getMessage());
                    }
                    if (atomicInteger.get() == 0) {
                        if (arrayList.size() > 0) {
                            promise.fail("The following queues could not be unlocked: " + arrayList);
                        } else {
                            promise.complete(Long.valueOf(response.size()));
                        }
                    }
                });
            }
        });
        return promise.future();
    }

    @Override // org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker
    public Future<String> unlockQueue(String str) {
        this.log.info("About to unlock queue '{}'", str);
        Promise promise = Promise.promise();
        this.vertx.eventBus().request(this.redisquesAddress, RedisquesAPI.buildDeleteLockOperation(str), asyncResult -> {
            if (asyncResult.failed()) {
                logQueueUnlockError(str, asyncResult.cause().getMessage());
                promise.fail(str);
            } else if ("ok".equals(((JsonObject) ((Message) asyncResult.result()).body()).getString("status"))) {
                promise.complete(str);
            } else {
                logQueueUnlockError(str, "Got reply with status value '" + ((JsonObject) ((Message) asyncResult.result()).body()).getString("status") + "'");
                promise.fail(str);
            }
        });
        return promise.future();
    }

    private void lockQueueSync(String str, HttpRequest httpRequest) {
        lockQueue(str, httpRequest).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                this.log.warn(asyncResult.cause().getMessage());
            }
        });
    }

    private void failWithNoRuleToCircuitMappingMessage(Promise promise, String str, HttpRequest httpRequest) {
        if (str == null) {
            promise.fail("no rule to circuit mapping found for uri " + httpRequest.getUri());
        } else {
            promise.fail("no rule to circuit mapping found for queue '" + str + "' and uri " + httpRequest.getUri());
        }
    }

    private PatternAndCircuitHash getPatternAndCircuitHashFromRequest(HttpRequest httpRequest) {
        return this.ruleToCircuitMapping.getCircuitFromRequestUri(httpRequest.getUri());
    }

    private String getRequestUniqueId(HttpRequest httpRequest) {
        String str = httpRequest.getHeaders().get("x-rp-unique_id");
        if (str == null) {
            str = httpRequest.getHeaders().get("x-rp-unique-id");
        }
        if (str == null) {
            this.log.warn("request to {} has no unique-id header. Using request uri instead", httpRequest.getUri());
            str = httpRequest.getUri();
        }
        return str;
    }

    private QueueCircuitBreakerConfigurationResource getConfig() {
        return this.configResourceManager.getConfigurationResource();
    }
}
