/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.jobmaster.clutch;

import com.yahoo.labs.samoa.instances.Attribute;
import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.server.worker.jobmaster.JobAutoScaler;
import io.mantisrx.server.worker.jobmaster.Util;
import io.mantisrx.server.worker.jobmaster.clutch.ClutchConfiguration;
import io.mantisrx.server.worker.jobmaster.clutch.ClutchControllerOutput;
import io.mantisrx.server.worker.jobmaster.clutch.ClutchPIDConfig;
import io.mantisrx.server.worker.jobmaster.control.actuators.ClutchMantisStageActuator;
import io.mantisrx.server.worker.jobmaster.control.controllers.PIDController;
import io.mantisrx.server.worker.jobmaster.control.utils.ErrorComputer;
import io.mantisrx.server.worker.jobmaster.control.utils.Integrator;
import io.mantisrx.shaded.com.google.common.cache.Cache;
import io.mantisrx.shaded.com.google.common.cache.CacheBuilder;
import io.mantisrx.shaded.com.google.common.util.concurrent.AtomicDouble;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import io.vavr.Tuple3;
import java.util.Calendar;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import moa.core.FastVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class ClutchAutoScaler
implements Observable.Transformer<JobAutoScaler.Event, Object> {
    private static final Logger log = LoggerFactory.getLogger(ClutchAutoScaler.class);
    private static final String autoscaleLogMessageFormat = "Autoscaling stage %d to %d instances on controller output: cpu/mem/network %f/%f/%f (dampening: %f) and predicted error: %f with dominant resource: %s";
    private static final FastVector attributes = new FastVector();
    private final JobAutoScaler.StageScaler scaler;
    private final StageSchedulingInfo stageSchedulingInfo;
    private final long initialSize;
    private final ClutchConfiguration config;
    private final AtomicLong targetScale = new AtomicLong(0L);
    private final AtomicDouble gainDampeningFactor = new AtomicDouble(1.0);
    private final AtomicLong cooldownTimestamp;
    private final AtomicLong rps = new AtomicLong(0L);
    private final ClutchPIDConfig defaultConfig = new ClutchPIDConfig(60.0, (Tuple2<Double, Double>)Tuple.of((Object)0.0, (Object)25.0), 0.01, 0.01);
    Cache<Long, Long> actionCache = CacheBuilder.newBuilder().maximumSize(12L).expireAfterWrite(60L, TimeUnit.MINUTES).build();
    AtomicDouble correction = new AtomicDouble(0.0);

    public ClutchAutoScaler(StageSchedulingInfo stageSchedulingInfo, JobAutoScaler.StageScaler scaler, ClutchConfiguration config, int initialSize) {
        this.stageSchedulingInfo = stageSchedulingInfo;
        this.scaler = scaler;
        this.initialSize = initialSize;
        this.targetScale.set(initialSize);
        this.config = config;
        this.rps.set(Math.round(config.rps));
        this.cooldownTimestamp = new AtomicLong(System.currentTimeMillis() + (Long)config.cooldownSeconds.getOrElse((Object)0L) * 1000L);
        Observable.interval((long)60L, (TimeUnit)TimeUnit.SECONDS).forEach(__ -> {
            double factor = this.computeGainFactor(this.actionCache);
            log.debug("Setting gain dampening factor to: {}.", (Object)factor);
            this.gainDampeningFactor.set(factor);
        });
    }

    private static double enforceMinMax(double targetScale, double min, double max) {
        if (Double.isNaN(targetScale)) {
            targetScale = min;
        }
        if (targetScale < min) {
            return min;
        }
        if (targetScale > max) {
            return max;
        }
        return targetScale;
    }

    private static int getMinutesIntoDay() {
        Calendar now = Calendar.getInstance();
        int hour = now.get(11);
        int minute = now.get(12);
        return hour * 60 + minute;
    }

    private double computeGainFactor(Cache<Long, Long> actionCache) {
        long nDown;
        long nUp = actionCache.asMap().values().stream().filter(x -> (double)x.longValue() > 0.0).count();
        long n = nUp + (nDown = actionCache.asMap().values().stream().filter(x -> (double)x.longValue() < 0.0).count());
        double x2 = n == 0L ? 1.0 : (nUp > nDown ? 1.0 * (double)nUp / (double)n : 1.0 * (double)nDown / (double)n);
        return Math.pow(x2, 3.0);
    }

    private ClutchControllerOutput findDominatingResource(Tuple3<ClutchControllerOutput, ClutchControllerOutput, ClutchControllerOutput> triple) {
        if (((ClutchControllerOutput)triple._1).scale >= ((ClutchControllerOutput)triple._2).scale && ((ClutchControllerOutput)triple._1).scale >= ((ClutchControllerOutput)triple._3).scale) {
            return (ClutchControllerOutput)triple._1;
        }
        if (((ClutchControllerOutput)triple._2).scale >= ((ClutchControllerOutput)triple._1).scale && ((ClutchControllerOutput)triple._2).scale >= ((ClutchControllerOutput)triple._3).scale) {
            return (ClutchControllerOutput)triple._2;
        }
        return (ClutchControllerOutput)triple._3;
    }

    public Observable<Object> call(Observable<JobAutoScaler.Event> metrics) {
        metrics = metrics.share();
        ClutchController cpuController = new ClutchController(StageScalingPolicy.ScalingReason.CPU, this.stageSchedulingInfo, (ClutchPIDConfig)this.config.cpu.getOrElse((Object)this.defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        ClutchController memController = new ClutchController(StageScalingPolicy.ScalingReason.Memory, this.stageSchedulingInfo, (ClutchPIDConfig)this.config.memory.getOrElse((Object)this.defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        ClutchController netController = new ClutchController(StageScalingPolicy.ScalingReason.Network, this.stageSchedulingInfo, (ClutchPIDConfig)this.config.network.getOrElse((Object)this.defaultConfig), this.gainDampeningFactor, this.initialSize, this.config.minSize, this.config.maxSize);
        Observable cpuSignal = metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.CPU)).compose((Observable.Transformer)cpuController);
        Observable memorySignal = metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.Memory)).compose((Observable.Transformer)memController);
        Observable networkSignal = metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.Network)).compose((Observable.Transformer)netController);
        Observable rawMetricsTuples = Observable.zip((Observable)metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.CPU)).map(JobAutoScaler.Event::getValue), (Observable)metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.Memory)).map(JobAutoScaler.Event::getValue), (Observable)metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.Network)).map(JobAutoScaler.Event::getValue), Tuple::of);
        Observable controlSignals = Observable.zip((Observable)cpuSignal, (Observable)memorySignal, (Observable)networkSignal, Tuple::of);
        Observable kafkaLag = metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.KafkaLag)).map(JobAutoScaler.Event::getValue).map(x -> x / this.config.rps);
        Observable dataDrop = metrics.filter(event -> event.getType().equals((Object)StageScalingPolicy.ScalingReason.DataDrop)).map(x -> x.getValue() / 100.0 * (double)x.getNumWorkers());
        Observable error = Observable.merge((Observable)Observable.just((Object)0.0), (Observable)kafkaLag, (Observable)dataDrop);
        Observable currentScale = metrics.map(JobAutoScaler.Event::getNumWorkers);
        Observable controllerSignal = Observable.zip((Observable)rawMetricsTuples, (Observable)controlSignals, Tuple::of).withLatestFrom(currentScale, (tup, scale) -> Tuple.of((Object)tup._1, (Object)tup._2, (Object)scale)).withLatestFrom(error, (tup, err) -> Tuple.of((Object)tup._1, (Object)tup._2, (Object)tup._3, (Object)err)).map(tup -> {
            int currentWorkerCount = (Integer)tup._3;
            ClutchControllerOutput dominantResource = this.findDominatingResource((Tuple3<ClutchControllerOutput, ClutchControllerOutput, ClutchControllerOutput>)((Tuple3)tup._2));
            String resourceName = dominantResource.reason.name();
            double yhat = (Double)tup._4;
            double d = yhat = (yhat = Math.min(yhat, (Double)this.config.maxAdjustment.getOrElse((Object)((double)this.config.maxSize * 1.0)))) < 1.0 ? 0.0 : yhat;
            if (System.currentTimeMillis() > this.cooldownTimestamp.get()) {
                double x = this.correction.addAndGet(yhat);
                x = Math.min(x, (Double)this.config.maxAdjustment.getOrElse((Object)((double)this.config.maxSize * 1.0)));
                this.correction.set(x);
            }
            this.correction.set(this.correction.get() * 0.99);
            this.correction.set(Double.isNaN(this.correction.get()) ? 0.0 : this.correction.get());
            Double targetScale = ClutchAutoScaler.enforceMinMax(Math.ceil(dominantResource.scale) + Math.ceil(this.correction.get()), this.config.minSize, this.config.maxSize);
            String logMessage = String.format(autoscaleLogMessageFormat, this.scaler.getStage(), targetScale.intValue(), ((ClutchControllerOutput)((Tuple3)tup._2)._1).scale, ((ClutchControllerOutput)((Tuple3)tup._2)._2).scale, ((ClutchControllerOutput)((Tuple3)tup._2)._3).scale, this.gainDampeningFactor.get(), this.correction.get(), resourceName);
            return Tuple.of((Object)logMessage, (Object)targetScale, (Object)currentWorkerCount);
        });
        return controllerSignal.filter(__ -> System.currentTimeMillis() > this.cooldownTimestamp.get()).filter(tup -> (double)Math.abs(Math.round((Double)tup._2) - (long)((Integer)tup._3).intValue()) > 0.99).doOnNext(signal -> log.info((String)signal._1)).compose((Observable.Transformer)new ClutchMantisStageActuator(this.scaler)).map(Math::round).doOnNext(x -> this.actionCache.put((Object)System.currentTimeMillis(), (Object)(x - this.targetScale.get()))).doOnNext(this.targetScale::set).doOnNext(__ -> this.cooldownTimestamp.set(System.currentTimeMillis() + (Long)this.config.cooldownSeconds.getOrElse((Object)0L) * 1000L)).map(x -> x);
    }

    public static void main(String[] args) {
        System.out.println(Double.NaN);
        System.out.println(Math.ceil(Double.NaN));
        System.out.println(false);
        System.out.println(false);
    }

    static {
        attributes.add((Object)new Attribute("cpu"));
        attributes.add((Object)new Attribute("memory"));
        attributes.add((Object)new Attribute("network"));
        attributes.add((Object)new Attribute("minuteofday"));
        attributes.add((Object)new Attribute("scale"));
        attributes.add((Object)new Attribute("error"));
    }

    private class ClutchController
    implements Observable.Transformer<JobAutoScaler.Event, ClutchControllerOutput> {
        private final ClutchPIDConfig config;
        private final StageScalingPolicy.ScalingReason metric;
        private final StageSchedulingInfo stageSchedulingInfo;
        private final AtomicDouble gainFactor;
        private final long initialSize;
        private final long min;
        private final long max;
        private final Integrator integrator;

        public ClutchController(StageScalingPolicy.ScalingReason metric, StageSchedulingInfo stageSchedulingInfo, ClutchPIDConfig config, AtomicDouble gainFactor, long initialSize, long min, long max) {
            this.metric = metric;
            this.config = config;
            this.gainFactor = gainFactor;
            this.initialSize = initialSize;
            this.stageSchedulingInfo = stageSchedulingInfo;
            this.min = min;
            this.max = max;
            this.integrator = new Integrator(this.initialSize, this.min - 1L, this.max + 1L);
        }

        public void resetIntegrator(double val) {
            this.integrator.setSum(val);
        }

        public Observable<ClutchControllerOutput> call(Observable<JobAutoScaler.Event> eventObservable) {
            return eventObservable.map(event -> Util.getEffectiveValue(this.stageSchedulingInfo, event.getType(), event.getValue())).lift((Observable.Operator)new ErrorComputer(this.config.setPoint, true, (Double)this.config.rope._1, (Double)this.config.rope._2)).lift((Observable.Operator)PIDController.of(this.config.kp, 0.0, this.config.kd, 1.0, this.gainFactor)).lift((Observable.Operator)this.integrator).map(x -> new ClutchControllerOutput(this.metric, (Double)x));
        }
    }
}

