/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.microprofile.faulttolerance;

import com.netflix.config.ConfigurationManager;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import io.helidon.microprofile.faulttolerance.CommandCompletableFuture;
import io.helidon.microprofile.faulttolerance.CommandFallback;
import io.helidon.microprofile.faulttolerance.CommandScheduler;
import io.helidon.microprofile.faulttolerance.ExceptionUtil;
import io.helidon.microprofile.faulttolerance.FaultToleranceCommand;
import io.helidon.microprofile.faulttolerance.FaultToleranceExtension;
import io.helidon.microprofile.faulttolerance.FaultToleranceMetrics;
import io.helidon.microprofile.faulttolerance.MethodIntrospector;
import io.helidon.microprofile.faulttolerance.TimeUtil;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Logger;
import javax.interceptor.InvocationContext;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.FailsafeException;
import net.jodah.failsafe.FailsafeExecutor;
import net.jodah.failsafe.Fallback;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.event.ExecutionAttemptedEvent;
import net.jodah.failsafe.function.CheckedFunction;
import net.jodah.failsafe.util.concurrent.Scheduler;
import org.apache.commons.configuration.AbstractConfiguration;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException;
import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;

public class CommandRetrier {
    private static final Logger LOGGER = Logger.getLogger(CommandRetrier.class.getName());
    private static final long DEFAULT_DELAY_CORRECTION = 250L;
    private static final String FT_DELAY_CORRECTION = "fault-tolerance.delayCorrection";
    private static final int DEFAULT_COMMAND_THREAD_POOL_SIZE = 8;
    private static final String FT_COMMAND_THREAD_POOL_SIZE = "fault-tolerance.commandThreadPoolSize";
    private static final long DEFAULT_THREAD_WAITING_PERIOD = 2000L;
    private static final String FT_THREAD_WAITING_PERIOD = "fault-tolerance.threadWaitingPeriod";
    private static final long DEFAULT_BULKHEAD_TASK_QUEUEING_PERIOD = 2000L;
    private static final String FT_BULKHEAD_TASK_QUEUEING_PERIOD = "fault-tolerance.bulkheadTaskQueueingPeriod";
    private final InvocationContext context;
    private final RetryPolicy<Object> retryPolicy;
    private final boolean isAsynchronous;
    private final MethodIntrospector introspector;
    private final Method method;
    private int invocationCount = 0;
    private FaultToleranceCommand command;
    private ClassLoader contextClassLoader;
    private final long delayCorrection;
    private final int commandThreadPoolSize;
    private final long threadWaitingPeriod;
    private final long bulkheadTaskQueueingPeriod;
    private CompletableFuture<?> taskQueued = new CompletableFuture();

    public CommandRetrier(InvocationContext context, MethodIntrospector introspector) {
        this.context = context;
        this.introspector = introspector;
        this.isAsynchronous = introspector.isAsynchronous();
        this.method = context.getMethod();
        Config config = ConfigProvider.getConfig();
        this.delayCorrection = config.getOptionalValue(FT_DELAY_CORRECTION, Long.class).orElse(250L);
        this.commandThreadPoolSize = config.getOptionalValue(FT_COMMAND_THREAD_POOL_SIZE, Integer.class).orElse(8);
        this.threadWaitingPeriod = config.getOptionalValue(FT_THREAD_WAITING_PERIOD, Long.class).orElse(2000L);
        this.bulkheadTaskQueueingPeriod = config.getOptionalValue(FT_BULKHEAD_TASK_QUEUEING_PERIOD, Long.class).orElse(2000L);
        Retry retry = introspector.getRetry();
        if (retry != null) {
            this.retryPolicy = new RetryPolicy().withMaxRetries(retry.maxRetries()).withMaxDuration(Duration.of(retry.maxDuration(), retry.durationUnit()));
            this.retryPolicy.handle(retry.retryOn());
            if (retry.abortOn().length > 0) {
                this.retryPolicy.abortOn(retry.abortOn());
            }
            long delay = TimeUtil.convertToNanos(retry.delay(), retry.delayUnit());
            Function<Long, Long> correction = d -> Math.abs(d - TimeUtil.convertToNanos(this.delayCorrection, ChronoUnit.MILLIS));
            if (retry.jitter() > 0L) {
                double factor;
                long jitter = TimeUtil.convertToNanos(retry.jitter(), retry.jitterDelayUnit());
                if (jitter > delay) {
                    long diff = jitter - delay;
                    delay += diff / 2L;
                    factor = 1.0;
                } else {
                    factor = (double)jitter / (double)delay;
                }
                this.retryPolicy.withDelay(Duration.of(correction.apply(delay), ChronoUnit.NANOS));
                this.retryPolicy.withJitter(factor);
            } else if (retry.delay() > 0L) {
                this.retryPolicy.withDelay(Duration.of(correction.apply(delay), ChronoUnit.NANOS));
            }
        } else {
            this.retryPolicy = new RetryPolicy().withMaxRetries(0);
        }
    }

    int commandThreadPoolSize() {
        return this.commandThreadPoolSize;
    }

    long threadWaitingPeriod() {
        return this.threadWaitingPeriod;
    }

    FaultToleranceCommand getCommand() {
        return this.command;
    }

    public Object execute() throws Exception {
        LOGGER.fine(() -> "Executing command with isAsynchronous = " + this.isAsynchronous);
        FailsafeExecutor failsafe = this.prepareFailsafeExecutor();
        try {
            if (this.isAsynchronous) {
                CommandScheduler scheduler = CommandScheduler.create(this.commandThreadPoolSize);
                failsafe = failsafe.with((Scheduler)scheduler);
                this.contextClassLoader = Thread.currentThread().getContextClassLoader();
                if (this.introspector.isReturnType(CompletionStage.class)) {
                    CommandCompletableFuture completionStage = CommandCompletableFuture.create(failsafe.getStageAsync(() -> (CompletionStage)this.retryExecute()), this::getCommand);
                    this.awaitBulkheadAsyncTaskQueued();
                    return completionStage;
                }
                if (this.introspector.isReturnType(Future.class)) {
                    CommandCompletableFuture future = CommandCompletableFuture.create(failsafe.getAsync(() -> (Future)this.retryExecute()), this::getCommand);
                    this.awaitBulkheadAsyncTaskQueued();
                    return future;
                }
                throw new InternalError("Validation failed, return type must be Future or CompletionStage");
            }
            return failsafe.get(this::retryExecute);
        }
        catch (FailsafeException e) {
            throw ExceptionUtil.toException(e.getCause());
        }
    }

    private FailsafeExecutor<Object> prepareFailsafeExecutor() {
        ArrayList<Object> policies = new ArrayList<Object>();
        if (this.introspector.hasFallback()) {
            CheckedFunction fallbackFunction = event -> {
                CommandFallback fallback = new CommandFallback(this.context, this.introspector, (ExecutionAttemptedEvent<?>)event);
                CompletableFuture<Object> result = fallback.execute();
                if (result instanceof CompletionStage) {
                    result = ((CompletionStage)result).toCompletableFuture();
                }
                if (result instanceof Future) {
                    result = ((Future)result).get();
                }
                return result;
            };
            policies.add(Fallback.of((CheckedFunction)fallbackFunction));
        }
        policies.add(this.retryPolicy);
        return Failsafe.with(policies);
    }

    private Object retryExecute() throws Exception {
        Object result;
        if (this.contextClassLoader != null) {
            Thread.currentThread().setContextClassLoader(this.contextClassLoader);
        }
        String commandKey = this.createCommandKey();
        this.command = new FaultToleranceCommand(this, commandKey, this.introspector, this.context, this.contextClassLoader, this.taskQueued);
        this.introspector.getHystrixProperties().entrySet().forEach(entry -> this.setProperty(commandKey, (String)entry.getKey(), entry.getValue()));
        try {
            LOGGER.fine(() -> "About to execute command with key " + this.command.getCommandKey() + " on thread " + Thread.currentThread().getName());
            ++this.invocationCount;
            this.updateMetricsBefore();
            result = this.command.execute();
            this.updateMetricsAfter(null);
        }
        catch (ExceptionUtil.WrappedException e) {
            Throwable cause = e.getCause();
            if (cause instanceof HystrixRuntimeException) {
                cause = cause.getCause();
            }
            this.updateMetricsAfter(cause);
            if (cause instanceof TimeoutException) {
                throw new org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException(cause);
            }
            if (CommandRetrier.isBulkheadRejection(cause)) {
                throw new BulkheadException(cause);
            }
            if (CommandRetrier.isHystrixBreakerException(cause)) {
                throw new CircuitBreakerOpenException(cause);
            }
            throw ExceptionUtil.toException(cause);
        }
        return result;
    }

    private void awaitBulkheadAsyncTaskQueued() {
        if (this.introspector.hasBulkhead()) {
            try {
                this.taskQueued.get(this.bulkheadTaskQueueingPeriod, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                LOGGER.info(() -> "Bulkhead async task queueing exception " + e);
            }
        }
    }

    private void updateMetricsBefore() {
        if (FaultToleranceExtension.isFaultToleranceMetricsEnabled() && this.introspector.hasRetry() && this.invocationCount > 1) {
            FaultToleranceMetrics.getCounter(this.method, "retry.retries.total").inc();
        }
    }

    private void updateMetricsAfter(Throwable cause) {
        if (!FaultToleranceExtension.isFaultToleranceMetricsEnabled()) {
            return;
        }
        if (this.introspector.hasRetry()) {
            boolean firstInvocation;
            Retry retry = this.introspector.getRetry();
            boolean bl = firstInvocation = this.invocationCount == 1;
            if (cause == null) {
                FaultToleranceMetrics.getCounter(this.method, "invocations.total").inc();
                FaultToleranceMetrics.getCounter(this.method, firstInvocation ? "retry.callsSucceededNotRetried.total" : "retry.callsSucceededRetried.total").inc();
            } else if (retry.maxRetries() == this.invocationCount - 1) {
                FaultToleranceMetrics.getCounter(this.method, "retry.callsFailed.total").inc();
                FaultToleranceMetrics.getCounter(this.method, "invocations.failed.total").inc();
                FaultToleranceMetrics.getCounter(this.method, "invocations.total").inc();
            }
        } else {
            FaultToleranceMetrics.getCounter(this.method, "invocations.total").inc();
            if (cause != null) {
                FaultToleranceMetrics.getCounter(this.method, "invocations.failed.total").inc();
            }
        }
        if (this.introspector.hasTimeout()) {
            FaultToleranceMetrics.getHistogram(this.method, "timeout.executionDuration").update(this.command.getExecutionTime());
            FaultToleranceMetrics.getCounter(this.method, cause instanceof TimeoutException ? "timeout.callsTimedOut.total" : "timeout.callsNotTimedOut.total").inc();
        }
        if (this.introspector.hasBulkhead()) {
            boolean bulkheadRejection = CommandRetrier.isBulkheadRejection(cause);
            if (!bulkheadRejection) {
                FaultToleranceMetrics.getHistogram(this.method, "bulkhead.executionDuration").update(this.command.getExecutionTime());
            }
            FaultToleranceMetrics.getCounter(this.method, bulkheadRejection ? "bulkhead.callsRejected.total" : "bulkhead.callsAccepted.total").inc();
        }
    }

    private String createCommandKey() {
        return this.method.getName() + Objects.hash(this.context.getTarget(), this.context.getMethod().hashCode());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setProperty(String commandKey, String key, Object value) {
        String actualKey = String.format("hystrix.command.%s.%s", commandKey, key);
        AbstractConfiguration abstractConfiguration = ConfigurationManager.getConfigInstance();
        synchronized (abstractConfiguration) {
            AbstractConfiguration configManager = ConfigurationManager.getConfigInstance();
            if (configManager.getProperty(actualKey) == null) {
                configManager.setProperty(actualKey, value);
            }
        }
    }

    private static boolean isHystrixBreakerException(Throwable t) {
        return t instanceof RuntimeException && t.getMessage().contains("Hystrix circuit short-circuited and is OPEN");
    }

    private static boolean isBulkheadRejection(Throwable t) {
        return t instanceof RejectedExecutionException || t instanceof RuntimeException && t.getMessage().contains("could not acquire a semaphore for execution");
    }
}

