/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s.worker;

import com.google.gson.reflect.TypeToken;
import com.squareup.okhttp.Call;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.ControllerContext;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.scheduler.SchedulerContext;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class K8sWorkerController
implements IWorkerController {
    private static final Logger LOG = Logger.getLogger(K8sWorkerController.class.getName());
    private edu.iu.dsc.tws.api.config.Config config;
    private String jobName;
    private int numberOfPods;
    private int numberOfWorkers;
    private int workersPerPod;
    private static CoreV1Api coreApi;
    private static ApiClient apiClient;
    private ArrayList<JobMasterAPI.WorkerInfo> workerList;
    private JobMasterAPI.WorkerInfo thisWorker;

    public K8sWorkerController(edu.iu.dsc.tws.api.config.Config config, String podName, String podIpStr, String containerName, String jobName, int workersPerPod) {
        this.config = config;
        this.numberOfWorkers = SchedulerContext.workerInstances((edu.iu.dsc.tws.api.config.Config)config);
        this.workersPerPod = workersPerPod;
        this.numberOfPods = this.numberOfWorkers / workersPerPod;
        this.workerList = new ArrayList();
        this.jobName = jobName;
        int containerIndex = KubernetesUtils.indexFromName(containerName);
        int workerID = this.calculateWorkerID(podName, containerIndex);
        int basePort = KubernetesContext.workerBasePort(config);
        InetAddress podIP = this.convertStringToIP(podIpStr);
        this.thisWorker = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)podIpStr, (int)(basePort + containerIndex), null);
        K8sWorkerController.createApiInstances();
    }

    public static void createApiInstances() {
        try {
            apiClient = Config.defaultClient();
            apiClient.getHttpClient().setReadTimeout(0L, TimeUnit.MILLISECONDS);
        }
        catch (IOException e) {
            LOG.log(Level.SEVERE, "Exception when creating ApiClient: ", e);
            throw new RuntimeException(e);
        }
        Configuration.setDefaultApiClient((ApiClient)apiClient);
        coreApi = new CoreV1Api(apiClient);
    }

    public static CoreV1Api getCoreApi() {
        if (coreApi == null) {
            K8sWorkerController.createApiInstances();
        }
        return coreApi;
    }

    public JobMasterAPI.WorkerInfo getWorkerInfo() {
        return this.thisWorker;
    }

    public JobMasterAPI.WorkerInfo getWorkerInfoForID(int id) {
        for (JobMasterAPI.WorkerInfo info : this.workerList) {
            if (info.getWorkerID() != id) continue;
            return info;
        }
        return null;
    }

    public int getNumberOfWorkers() {
        return this.numberOfWorkers;
    }

    public ArrayList<JobMasterAPI.WorkerInfo> getJoinedWorkers() {
        return this.workerList;
    }

    public boolean buildWorkerListWaitForAll(long timeLimit) {
        long startTime = System.currentTimeMillis();
        long sleepInterval = 300L;
        long logMessageInterval = 1000L;
        long waitTimeCountForLog = 0L;
        while (true) {
            long duration;
            this.buildWorkerList();
            if (this.numberOfWorkers == this.workerList.size()) {
                LOG.info("Received data about all pods. ");
                K8sWorkerController.printWorkers(this.workerList);
                return true;
            }
            if (waitTimeCountForLog >= logMessageInterval) {
                LOG.info("Data is not received for some pods. Number of received workers: " + this.workerList.size() + ". Will try again. Waiting " + logMessageInterval + "ms");
                waitTimeCountForLog = 0L;
            }
            if ((duration = System.currentTimeMillis() - startTime) > timeLimit) {
                LOG.log(Level.SEVERE, "Time limit has been reached when trying to build worker list. Given Time limit: " + timeLimit + "ms.");
                return false;
            }
            try {
                Thread.sleep(sleepInterval);
                waitTimeCountForLog += sleepInterval;
                continue;
            }
            catch (InterruptedException e) {
                LOG.log(Level.WARNING, "Thread sleep interrupted.", e);
                continue;
            }
            break;
        }
    }

    private InetAddress convertStringToIP(String ipStr) {
        try {
            return InetAddress.getByName(ipStr);
        }
        catch (UnknownHostException e) {
            LOG.log(Level.SEVERE, "Can not convert the pod IP to InetAddress: " + ipStr, e);
            throw new RuntimeException(e);
        }
    }

    private void buildWorkerList() {
        String namespace = KubernetesContext.namespace(this.config);
        String servicelabel = KubernetesUtils.createServiceLabelWithKey(this.jobName);
        int basePort = KubernetesContext.workerBasePort(this.config);
        V1PodList list = null;
        try {
            list = coreApi.listNamespacedPod(namespace, null, null, null, null, servicelabel, null, null, null, null);
        }
        catch (ApiException e) {
            String logMessage = "Exception when getting the pod list for the job: " + this.jobName + "\nexCode: " + e.getCode() + "\nresponseBody: " + e.getResponseBody();
            LOG.log(Level.SEVERE, logMessage, e);
            throw new RuntimeException(e);
        }
        this.workerList.clear();
        for (V1Pod pod : list.getItems()) {
            String podName = pod.getMetadata().getName();
            if (!podName.startsWith(this.jobName)) {
                LOG.warning("A pod received that does not belong to this job. PodName: " + podName);
                continue;
            }
            InetAddress podIP = this.convertStringToIP(pod.getStatus().getPodIP());
            int i = 0;
            while (i < this.workersPerPod) {
                int containerIndex = i++;
                int workerID = this.calculateWorkerID(podName, containerIndex);
                JobMasterAPI.WorkerInfo workerNetworkInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)pod.getStatus().getPodIP(), (int)(basePort + containerIndex), null);
                this.workerList.add(workerNetworkInfo);
            }
        }
    }

    public int calculateWorkerID(String podName, int containerIndex) {
        int podNo = KubernetesUtils.indexFromName(podName);
        return podNo * this.workersPerPod + containerIndex;
    }

    public static void printWorkers(ArrayList<JobMasterAPI.WorkerInfo> workers) {
        StringBuffer buffer = new StringBuffer();
        buffer.append("Number of workers: " + workers.size() + "\n");
        int i = 0;
        for (JobMasterAPI.WorkerInfo worker : workers) {
            buffer.append(String.format("%d: workerID[%d] %s:%d\n", i++, worker.getWorkerID(), worker.getWorkerIP(), worker.getPort()));
        }
        LOG.info(buffer.toString());
    }

    public List<JobMasterAPI.WorkerInfo> getAllWorkers() throws TimeoutException {
        long duration;
        long remainingTimeLimit;
        boolean listBuilt;
        long timeLimitMilliSec = ControllerContext.maxWaitTimeForAllToJoin((edu.iu.dsc.tws.api.config.Config)this.config);
        long startTime = System.currentTimeMillis();
        if (this.workerList.size() < this.numberOfWorkers && !(listBuilt = this.buildWorkerListWaitForAll(timeLimitMilliSec))) {
            throw new TimeoutException("All workers have not joined the job on the time limit: " + timeLimitMilliSec + "ms.");
        }
        ArrayList<String> podNameList = this.constructPodNameList();
        boolean allRunning = this.waitUntilAllPodsRunning(podNameList, remainingTimeLimit = timeLimitMilliSec - (duration = System.currentTimeMillis() - startTime));
        if (allRunning) {
            return this.workerList;
        }
        throw new TimeoutException("All workers have not joined the job on the specified time limit: " + timeLimitMilliSec + "ms.");
    }

    private ArrayList<String> constructPodNameList() {
        ArrayList<String> podNameList = new ArrayList<String>();
        for (int i = 0; i < this.numberOfPods; ++i) {
            String podName = this.jobName + "-" + i;
            podNameList.add(podName);
        }
        return podNameList;
    }

    public boolean waitUntilAllPodsRunning(ArrayList<String> podList, long timeoutMiliSec) {
        String phase = "Running";
        String namespace = KubernetesContext.namespace(this.config);
        String servicelabel = KubernetesUtils.createServiceLabelWithKey(this.jobName);
        Integer timeoutSeconds = (int)(timeoutMiliSec / 1000L);
        Watch watch = null;
        try {
            watch = Watch.createWatch((ApiClient)apiClient, (Call)coreApi.listNamespacedPodCall(namespace, null, null, null, null, servicelabel, null, null, timeoutSeconds, Boolean.TRUE, null, null), (Type)new TypeToken<Watch.Response<V1Pod>>(){}.getType());
        }
        catch (ApiException e) {
            String logMessage = "Exception when watching the pods for the job: " + this.jobName + "\nexCode: " + e.getCode() + "\nresponseBody: " + e.getResponseBody();
            LOG.log(Level.SEVERE, logMessage, e);
            throw new RuntimeException(e);
        }
        boolean result = false;
        for (Watch.Response item : watch) {
            if (item.object == null || !podList.contains(((V1Pod)item.object).getMetadata().getName()) || !phase.equals(((V1Pod)item.object).getStatus().getPhase())) continue;
            podList.remove(((V1Pod)item.object).getMetadata().getName());
            LOG.log(Level.INFO, "Received pod Running event for the pod: " + ((V1Pod)item.object).getMetadata().getName());
            if (podList.size() != 0) continue;
            result = true;
            break;
        }
        try {
            watch.close();
        }
        catch (IOException e) {
            LOG.log(Level.SEVERE, "Exception closing watcher.", e);
        }
        return result;
    }

    public void waitOnBarrier() throws TimeoutException {
    }
}

