/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.mesos;

import io.mantisrx.runtime.loader.config.MetricsCollector;
import io.mantisrx.runtime.loader.config.Usage;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.shaded.com.google.common.base.Strings;
import io.reactivx.mantis.operators.OperatorOnErrorResumeNextViaFunction;
import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.protocol.http.client.HttpClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

public class MesosMetricsCollector
implements MetricsCollector {
    private static final String MESOS_TASK_EXECUTOR_ID_KEY = "MESOS_EXECUTOR_ID";
    private static final Logger logger = LoggerFactory.getLogger(MesosMetricsCollector.class);
    private static final long GET_TIMEOUT_SECS = 5L;
    private static final int MAX_REDIRECTS = 10;
    private final int slavePort;
    private final String taskId;
    private final Func1<Observable<? extends Throwable>, Observable<?>> retryLogic = attempts -> attempts.zipWith(Observable.range((int)1, (int)3), (t1, integer) -> integer).flatMap(integer -> {
        long delay = 2L;
        logger.info(": retrying conx after sleeping for " + delay + " secs");
        return Observable.timer((long)delay, (TimeUnit)TimeUnit.SECONDS);
    });

    public static MesosMetricsCollector valueOf(Properties properties) {
        int slavePort = Integer.parseInt(properties.getProperty("mantis.agent.mesos.slave.port", "5051"));
        String taskId = System.getenv(MESOS_TASK_EXECUTOR_ID_KEY);
        return new MesosMetricsCollector(slavePort, taskId);
    }

    public static MesosMetricsCollector valueOf(WorkerConfiguration workerConfiguration) {
        int slavePort = workerConfiguration.getMesosSlavePort();
        String taskId = System.getenv(MESOS_TASK_EXECUTOR_ID_KEY);
        return new MesosMetricsCollector(slavePort, taskId);
    }

    MesosMetricsCollector(int slavePort, String taskId) {
        logger.info("Creating MesosMetricsCollector to port {} of taskId: {}", (Object)slavePort, (Object)taskId);
        if (Strings.isNullOrEmpty((String)taskId)) {
            logger.error("Invalid task id for MesosMetricsCollector");
        }
        this.slavePort = slavePort;
        this.taskId = taskId;
    }

    private String getUsageJson() {
        String usageEndpoint = "monitor/statistics.json";
        String url = "http://localhost:" + this.slavePort + "/" + usageEndpoint;
        return (String)RxNetty.createHttpRequest((HttpClientRequest)HttpClientRequest.createGet((String)url), (HttpClient.HttpClientConfig)((HttpClient.HttpClientConfig)new HttpClient.HttpClientConfig.Builder().setFollowRedirect(true).followRedirect(10).build())).lift((Observable.Operator)new OperatorOnErrorResumeNextViaFunction(t -> Observable.error((Throwable)t))).timeout(5L, TimeUnit.SECONDS).retryWhen(this.retryLogic).flatMap(r -> r.getContent()).map(o -> o.toString(Charset.defaultCharset())).doOnError(throwable -> logger.warn("Can't get resource usage from mesos slave endpoint (" + url + ") - " + throwable.getMessage(), throwable)).toBlocking().firstOrDefault((Object)"");
    }

    public Usage get() {
        return MesosMetricsCollector.getCurentUsage(this.taskId, this.getUsageJson());
    }

    static Usage getCurentUsage(String taskId, String usageJson) {
        double network_write_bytes;
        double network_read_bytes;
        double mem_limit;
        double mem_anon_bytes;
        double mem_rss_bytes;
        double cpus_user_time_secs;
        double cpus_system_time_secs;
        if (usageJson == null || usageJson.isEmpty()) {
            logger.warn("Empty usage on task {}", (Object)taskId);
            return null;
        }
        JSONArray array = new JSONArray(usageJson);
        if (array.length() == 0) {
            return null;
        }
        JSONObject obj = null;
        for (int i = 0; i < array.length(); ++i) {
            String id;
            JSONObject executor = array.getJSONObject(i);
            if (executor == null || (id = executor.optString("executor_id")) == null || !id.equals(taskId)) continue;
            obj = executor.getJSONObject("statistics");
            break;
        }
        if (obj == null) {
            return null;
        }
        double cpus_limit = obj.optDouble("cpus_limit");
        if (Double.isNaN(cpus_limit)) {
            cpus_limit = 0.0;
        }
        if (Double.isNaN(cpus_system_time_secs = obj.optDouble("cpus_system_time_secs"))) {
            logger.warn("Didn't get cpus_system_time_secs from mesos stats");
            cpus_system_time_secs = 0.0;
        }
        if (Double.isNaN(cpus_user_time_secs = obj.optDouble("cpus_user_time_secs"))) {
            logger.warn("Didn't get cpus_user_time_secs from mesos stats");
            cpus_user_time_secs = 0.0;
        }
        if (Double.isNaN(mem_rss_bytes = obj.optDouble("mem_rss_bytes"))) {
            logger.warn("Couldn't get mem_rss_bytes from mesos stats");
            mem_rss_bytes = 0.0;
        }
        if (Double.isNaN(mem_anon_bytes = obj.optDouble("mem_anon_bytes"))) {
            mem_anon_bytes = mem_rss_bytes;
        }
        if (Double.isNaN(mem_limit = obj.optDouble("mem_limit_bytes"))) {
            mem_limit = 0.0;
        }
        if (Double.isNaN(network_read_bytes = obj.optDouble("net_rx_bytes"))) {
            network_read_bytes = 0.0;
        }
        if (Double.isNaN(network_write_bytes = obj.optDouble("net_tx_bytes"))) {
            network_write_bytes = 0.0;
        }
        return new Usage(cpus_limit, cpus_system_time_secs, cpus_user_time_secs, mem_limit, mem_rss_bytes, mem_anon_bytes, network_read_bytes, network_write_bytes);
    }
}

