/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.jobmaster;

import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.server.core.TimeBufferedWorkerOutlier;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.mantisrx.server.core.WorkerOutlier;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.worker.jobmaster.AutoScaleMetricsConfig;
import io.mantisrx.server.worker.jobmaster.GaugeData;
import io.mantisrx.server.worker.jobmaster.JobAutoScaler;
import io.mantisrx.server.worker.jobmaster.MetricAggregator;
import io.mantisrx.server.worker.jobmaster.MetricData;
import io.mantisrx.server.worker.jobmaster.WorkerMetrics;
import io.mantisrx.shaded.com.google.common.cache.Cache;
import io.mantisrx.shaded.com.google.common.cache.CacheBuilder;
import io.reactivx.mantis.operators.DropOperator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.SerializedObserver;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

class WorkerMetricHandler {
    private static final Logger logger = LoggerFactory.getLogger(WorkerMetricHandler.class);
    private final PublishSubject<MetricData> metricDataSubject = PublishSubject.create();
    private final Observer<JobAutoScaler.Event> jobAutoScaleObserver;
    private final MantisMasterGateway masterClientApi;
    private final AutoScaleMetricsConfig autoScaleMetricsConfig;
    private final MetricAggregator metricAggregator;
    private final Map<Integer, Integer> numWorkersByStage = new HashMap<Integer, Integer>();
    private final Map<Integer, List<WorkerHost>> workerHostsByStage = new HashMap<Integer, List<WorkerHost>>();
    private final String jobId;
    private final Func1<Integer, Integer> lookupNumWorkersByStage = stage -> {
        if (this.numWorkersByStage.containsKey(stage)) {
            return this.numWorkersByStage.get(stage);
        }
        logger.warn("num workers for stage {} not known", stage);
        return -1;
    };

    public WorkerMetricHandler(String jobId, Observer<JobAutoScaler.Event> jobAutoScaleObserver, MantisMasterGateway masterClientApi, AutoScaleMetricsConfig autoScaleMetricsConfig) {
        this.jobId = jobId;
        this.jobAutoScaleObserver = jobAutoScaleObserver;
        this.masterClientApi = masterClientApi;
        this.autoScaleMetricsConfig = autoScaleMetricsConfig;
        this.metricAggregator = new MetricAggregator(autoScaleMetricsConfig);
    }

    public Observer<MetricData> initAndGetMetricDataObserver() {
        this.start();
        return new SerializedObserver(this.metricDataSubject);
    }

    private Map<String, GaugeData> getAggregates(List<Map<String, GaugeData>> dataPointsList) {
        HashMap<String, List<GaugeData>> transformed = new HashMap<String, List<GaugeData>>();
        for (Map<String, GaugeData> datapoint : dataPointsList) {
            for (Map.Entry<String, GaugeData> gauge : datapoint.entrySet()) {
                if (!transformed.containsKey(gauge.getKey())) {
                    transformed.put(gauge.getKey(), new ArrayList());
                }
                ((List)transformed.get(gauge.getKey())).add(gauge.getValue());
            }
        }
        return this.metricAggregator.getAggregates(transformed);
    }

    private void start() {
        AtomicReference ref = new AtomicReference(new ArrayList());
        this.masterClientApi.schedulingChanges(this.jobId).doOnNext(jobSchedulingInfo -> {
            Map workerAssignments = jobSchedulingInfo.getWorkerAssignments();
            for (Map.Entry workerAssignmentsEntry : workerAssignments.entrySet()) {
                WorkerAssignments workerAssignment = (WorkerAssignments)workerAssignmentsEntry.getValue();
                logger.debug("setting numWorkers={} for stage={}", (Object)workerAssignment.getNumWorkers(), (Object)workerAssignment.getStage());
                this.numWorkersByStage.put(workerAssignment.getStage(), workerAssignment.getNumWorkers());
                this.workerHostsByStage.put(workerAssignment.getStage(), new ArrayList(workerAssignment.getHosts().values()));
            }
        }).subscribe();
        logger.info("Starting worker metric handler with autoscale config {}", (Object)this.autoScaleMetricsConfig);
        this.metricDataSubject.groupBy(metricData -> metricData.getStage()).lift((Observable.Operator)new DropOperator(WorkerMetricHandler.class.getName())).doOnNext(go -> {
            Integer stage = (Integer)go.getKey();
            Subscription s = go.lift((Observable.Operator)new StageMetricDataOperator(stage, this.lookupNumWorkersByStage, this.autoScaleMetricsConfig)).subscribe();
            logger.info("adding subscription for stage {} StageMetricDataOperator", (Object)stage);
            ((List)ref.get()).add(s);
        }).doOnUnsubscribe(() -> {
            for (Subscription s : (List)ref.get()) {
                s.unsubscribe();
            }
        }).subscribe();
    }

    private class StageMetricDataOperator
    implements Observable.Operator<Object, MetricData> {
        private static final int killCooldownSecs = 600;
        private final Pattern hostExtractorPattern = Pattern.compile(".+:.+:sockAddr=/(?<host>.+)");
        private final int stage;
        private final Func1<Integer, Integer> numStageWorkersFn;
        private final int valuesToKeep = 2;
        private final AutoScaleMetricsConfig autoScaleMetricsConfig;
        private final ConcurrentMap<Integer, WorkerMetrics> workersMap = new ConcurrentHashMap<Integer, WorkerMetrics>();
        private final ConcurrentMap<String, WorkerMetrics> sourceJobWorkersMap = new ConcurrentHashMap<String, WorkerMetrics>();
        private final Cache<String, String> sourceJobMetricsRecent = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).build();
        private final WorkerOutlier workerOutlier;
        private final TimeBufferedWorkerOutlier workerOutlierForSourceJobMetrics;
        private final Map<Integer, Integer> workerNumberByIndex = new HashMap<Integer, Integer>();
        private static final int metricsIntervalSeconds = 30;

        public StageMetricDataOperator(int stage, Func1<Integer, Integer> numStageWorkersFn, AutoScaleMetricsConfig autoScaleMetricsConfig) {
            logger.debug("setting operator for stage " + stage);
            this.stage = stage;
            this.numStageWorkersFn = numStageWorkersFn;
            this.autoScaleMetricsConfig = autoScaleMetricsConfig;
            Action1 workerResubmitFunc = workerIndex -> {
                try {
                    if (!this.workerNumberByIndex.containsKey(workerIndex)) {
                        logger.error("outlier resubmit FAILED. worker number not found for worker index {} stage {}", workerIndex, (Object)stage);
                        return;
                    }
                    int workerNumber = this.workerNumberByIndex.get(workerIndex);
                    if (this.resubmitOutlierWorkerEnabled()) {
                        logger.info("resubmitting worker job {} stage {} idx {} workerNum {} (dropping excessive data compared to others)", new Object[]{WorkerMetricHandler.this.jobId, stage, workerIndex, workerNumber});
                        WorkerMetricHandler.this.masterClientApi.resubmitJobWorker(WorkerMetricHandler.this.jobId, "JobMaster", workerNumber, "dropping excessive data compared to others in stage").onErrorResumeNext(throwable -> {
                            logger.error("caught error ({}) when resubmitting outlier worker num {}", (Object)throwable.getMessage(), (Object)workerNumber);
                            return Observable.empty();
                        }).subscribe();
                    } else {
                        logger.info("resubmitOutlier property is disabled. Not killing worker job {} stage {} idx {} workerNum {} (dropping excessive data compared to others)", new Object[]{WorkerMetricHandler.this.jobId, stage, workerIndex, workerNumber});
                    }
                }
                catch (Exception e) {
                    logger.warn("Can't resubmit outlier worker idx {} error {}", new Object[]{workerIndex, e.getMessage(), e});
                }
            };
            this.workerOutlier = new WorkerOutlier(600L, workerResubmitFunc);
            this.workerOutlierForSourceJobMetrics = new TimeBufferedWorkerOutlier(600L, 30L, workerIndex -> {
                List candidates = (List)WorkerMetricHandler.this.workerHostsByStage.get(stage);
                if (candidates != null) {
                    candidates.stream().filter(h -> h.getWorkerIndex() == workerIndex.intValue()).map(WorkerHost::getHost).findFirst().ifPresent(host -> this.lookupWorkersByHost((String)host).stream().forEach(i -> workerResubmitFunc.call(i)));
                }
            });
        }

        private boolean resubmitOutlierWorkerEnabled() {
            String resubmitOutlierWorkerProp = "mantis.worker.jobmaster.outlier.worker.resubmit";
            String enableOutlierWorkerResubmit = "true";
            boolean resubmitOutlierWorker = Boolean.valueOf(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.jobmaster.outlier.worker.resubmit", "true"));
            return resubmitOutlierWorker;
        }

        private List<Integer> lookupWorkersByHost(String host) {
            List candidates = (List)WorkerMetricHandler.this.workerHostsByStage.get(this.stage);
            if (candidates != null) {
                return candidates.stream().filter(h -> h.getHost().equals(host)).map(WorkerHost::getWorkerIndex).collect(Collectors.toList());
            }
            return new ArrayList<Integer>();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void addDataPoint(MetricData datapoint) {
            Map<String, Double> dataDropGauges;
            MetricData transformedMetricData;
            int workerIndex = datapoint.getWorkerIndex();
            logger.debug("adding data point for worker idx={} data={}", (Object)workerIndex, (Object)datapoint);
            WorkerMetrics workerMetrics = (WorkerMetrics)this.workersMap.get(workerIndex);
            if (workerMetrics == null) {
                workerMetrics = new WorkerMetrics(2);
                this.workersMap.put(workerIndex, workerMetrics);
            }
            if ((transformedMetricData = workerMetrics.addDataPoint(datapoint.getMetricGroupName(), datapoint)).getMetricGroupName().equals("DataDrop") && (dataDropGauges = transformedMetricData.getGaugeData().getGauges()).containsKey("dropPercent")) {
                this.workerOutlier.addDataPoint(workerIndex, dataDropGauges.get("dropPercent").doubleValue(), ((Integer)this.numStageWorkersFn.call((Object)this.stage)).intValue());
            }
            this.workerNumberByIndex.put(workerIndex, datapoint.getWorkerNumber());
            int maxIdx = 0;
            ConcurrentMap<Integer, WorkerMetrics> concurrentMap = this.workersMap;
            synchronized (concurrentMap) {
                for (Integer idx : this.workersMap.keySet()) {
                    maxIdx = Math.max(maxIdx, idx);
                }
            }
            Integer numWorkers = (Integer)this.numStageWorkersFn.call((Object)this.stage);
            if (numWorkers > -1) {
                for (int idx = numWorkers.intValue(); idx <= maxIdx; ++idx) {
                    this.workersMap.remove(idx);
                }
            }
        }

        private void addSourceJobDataPoint(MetricData datapoint) {
            String sourceJobId = datapoint.getJobId();
            int workerIndex = datapoint.getWorkerIndex();
            String sourceWorkerKey = sourceJobId + ":" + workerIndex;
            WorkerMetrics workerMetrics = (WorkerMetrics)this.sourceJobWorkersMap.get(sourceWorkerKey);
            if (workerMetrics == null) {
                workerMetrics = new WorkerMetrics(2);
                this.sourceJobWorkersMap.put(sourceWorkerKey, workerMetrics);
            }
            workerMetrics.addDataPoint(datapoint.getMetricGroupName(), datapoint);
            String sourceMetricKey = sourceWorkerKey + ":" + datapoint.getMetricGroupName();
            this.sourceJobMetricsRecent.put((Object)sourceMetricKey, (Object)sourceMetricKey);
            Matcher matcher = this.hostExtractorPattern.matcher(datapoint.getMetricGroupName());
            if (matcher.matches()) {
                List<Integer> workerIndices = this.lookupWorkersByHost(matcher.group("host"));
                for (Map.Entry<String, Double> gauge : datapoint.getGaugeData().getGauges().entrySet()) {
                    if (!this.autoScaleMetricsConfig.isSourceJobDropMetric(datapoint.getMetricGroupName(), gauge.getKey())) continue;
                    workerIndices.stream().forEach(i -> this.workerOutlierForSourceJobMetrics.addDataPoint(i.intValue(), (Double)gauge.getValue() / (double)workerIndices.size(), ((Integer)this.numStageWorkersFn.call((Object)this.stage)).intValue()));
                }
            }
        }

        public Subscriber<? super MetricData> call(final Subscriber<? super Object> child) {
            child.add(Schedulers.computation().createWorker().schedulePeriodically(new Action0(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void call() {
                    GaugeData gaugeData;
                    Map<String, Double> map;
                    GaugeData gaugeData2;
                    Map<String, Double> map2;
                    ArrayList<Map<String, GaugeData>> listofAggregates = new ArrayList<Map<String, GaugeData>>();
                    ConcurrentMap concurrentMap = StageMetricDataOperator.this.workersMap;
                    synchronized (concurrentMap) {
                        for (Map.Entry entry : StageMetricDataOperator.this.workersMap.entrySet()) {
                            listofAggregates.add(WorkerMetricHandler.this.metricAggregator.getAggregates(((WorkerMetrics)entry.getValue()).getGaugesByMetricGrp()));
                        }
                    }
                    int numWorkers = (Integer)StageMetricDataOperator.this.numStageWorkersFn.call((Object)StageMetricDataOperator.this.stage);
                    Map allWorkerAggregates = WorkerMetricHandler.this.getAggregates(listofAggregates);
                    logger.info("Job stage " + StageMetricDataOperator.this.stage + " avgResUsage from " + StageMetricDataOperator.this.workersMap.size() + " workers: " + allWorkerAggregates.toString());
                    for (Map.Entry<String, Set<String>> entry : StageMetricDataOperator.this.autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) {
                        String metricGrp = entry.getKey();
                        for (String metric : entry.getValue()) {
                            if (!allWorkerAggregates.containsKey(metricGrp) || !((GaugeData)allWorkerAggregates.get(metricGrp)).getGauges().containsKey(metric)) {
                                logger.debug("no gauge data found for UserDefined (metric={})", entry);
                                continue;
                            }
                            WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.UserDefined, StageMetricDataOperator.this.stage, ((GaugeData)allWorkerAggregates.get(metricGrp)).getGauges().get(metric), ((GaugeData)allWorkerAggregates.get(metricGrp)).getGauges().get(metric), numWorkers));
                        }
                    }
                    if (allWorkerAggregates.containsKey("consumer-fetch-manager-metrics")) {
                        Map<String, Double> map3 = ((GaugeData)allWorkerAggregates.get("consumer-fetch-manager-metrics")).getGauges();
                        if (map3.containsKey("records-lag-max")) {
                            WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaLag, StageMetricDataOperator.this.stage, map3.get("records-lag-max"), map3.get("records-lag-max"), numWorkers));
                        }
                        if (map3.containsKey("records-consumed-rate")) {
                            WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.KafkaProcessed, StageMetricDataOperator.this.stage, map3.get("records-consumed-rate"), map3.get("records-consumed-rate"), numWorkers));
                        }
                    }
                    if (allWorkerAggregates.containsKey("ResourceUsage")) {
                        double d = ((GaugeData)allWorkerAggregates.get("ResourceUsage")).getGauges().get("cpuPctUsageCurr") / 100.0;
                        double cpuUsageLimit = ((GaugeData)allWorkerAggregates.get("ResourceUsage")).getGauges().get("cpuPctLimit") / 100.0;
                        double cpuUsageEffectiveValue = 100.0 * d / cpuUsageLimit;
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.CPU, StageMetricDataOperator.this.stage, d, cpuUsageEffectiveValue, numWorkers));
                        double nwBytesUsageCurr = ((GaugeData)allWorkerAggregates.get("ResourceUsage")).getGauges().get("nwBytesUsageCurr");
                        double nwBytesLimit = ((GaugeData)allWorkerAggregates.get("ResourceUsage")).getGauges().get("nwBytesLimit");
                        double nwBytesEffectiveValue = 100.0 * nwBytesUsageCurr / nwBytesLimit;
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Network, StageMetricDataOperator.this.stage, nwBytesUsageCurr, nwBytesEffectiveValue, numWorkers));
                        double memoryUsageInMB = ((GaugeData)allWorkerAggregates.get("ResourceUsage")).getGauges().get("jvmMemoryUsedBytes") / 1048576.0;
                        double memoryLimitInMB = ((GaugeData)allWorkerAggregates.get("ResourceUsage")).getGauges().get("memLimit");
                        double effectiveValue = 100.0 * memoryUsageInMB / memoryLimitInMB;
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.Memory, StageMetricDataOperator.this.stage, memoryUsageInMB, effectiveValue, numWorkers));
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.JVMMemory, StageMetricDataOperator.this.stage, memoryUsageInMB, effectiveValue, numWorkers));
                    }
                    if (allWorkerAggregates.containsKey("DataDrop") && (map2 = (gaugeData2 = (GaugeData)allWorkerAggregates.get("DataDrop")).getGauges()).containsKey("dropPercent")) {
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.DataDrop, StageMetricDataOperator.this.stage, map2.get("dropPercent"), map2.get("dropPercent"), numWorkers));
                    }
                    if (allWorkerAggregates.containsKey("worker_stage_inner_input") && (map = (gaugeData = (GaugeData)allWorkerAggregates.get("worker_stage_inner_input")).getGauges()).containsKey("onNextGauge")) {
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.RPS, StageMetricDataOperator.this.stage, map.get("onNextGauge") / 6.0, map.get("onNextGauge") / 6.0, numWorkers));
                    }
                    double d = 0.0;
                    boolean hasSourceJobDropsMetric = false;
                    ConcurrentMap sourceMetricsRecent = StageMetricDataOperator.this.sourceJobMetricsRecent.asMap();
                    for (Map.Entry worker : StageMetricDataOperator.this.sourceJobWorkersMap.entrySet()) {
                        Map<String, GaugeData> metricGroups = WorkerMetricHandler.this.metricAggregator.getAggregates(((WorkerMetrics)worker.getValue()).getGaugesByMetricGrp());
                        for (Map.Entry<String, GaugeData> group : metricGroups.entrySet()) {
                            String metricKey = (String)worker.getKey() + ":" + group.getKey();
                            for (Map.Entry<String, Double> gauge : group.getValue().getGauges().entrySet()) {
                                if (!sourceMetricsRecent.containsKey(metricKey) || !StageMetricDataOperator.this.autoScaleMetricsConfig.isSourceJobDropMetric(group.getKey(), gauge.getKey())) continue;
                                d += gauge.getValue().doubleValue();
                                hasSourceJobDropsMetric = true;
                            }
                        }
                    }
                    if (hasSourceJobDropsMetric) {
                        logger.info("Job stage {}, source job drop metrics: {}", (Object)StageMetricDataOperator.this.stage, (Object)d);
                        WorkerMetricHandler.this.jobAutoScaleObserver.onNext((Object)new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.SourceJobDrop, StageMetricDataOperator.this.stage, d / 6.0 / (double)numWorkers, d / 6.0 / (double)numWorkers, numWorkers));
                    }
                }
            }, 30L, 30L, TimeUnit.SECONDS));
            return new Subscriber<MetricData>(){

                public void onCompleted() {
                    child.unsubscribe();
                }

                public void onError(Throwable e) {
                    logger.error("Unexpected error: " + e.getMessage(), e);
                }

                public void onNext(MetricData metricData) {
                    logger.debug("Got metric metricData for job " + WorkerMetricHandler.this.jobId + " stage " + StageMetricDataOperator.this.stage + ", worker " + metricData.getWorkerNumber() + ": " + metricData);
                    if (WorkerMetricHandler.this.jobId.equals(metricData.getJobId())) {
                        StageMetricDataOperator.this.addDataPoint(metricData);
                    } else {
                        StageMetricDataOperator.this.addSourceJobDataPoint(metricData);
                    }
                }
            };
        }
    }
}

