/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.driver.IScalerPerCluster;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.scheduler.ILauncher;
import edu.iu.dsc.tws.api.scheduler.SchedulerContext;
import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
import edu.iu.dsc.tws.master.IJobTerminator;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.server.JobMaster;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.NodeInfoUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.JobSubmissionStatus;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.PodWatchUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.RequestObjectBuilder;
import edu.iu.dsc.tws.rsched.schedulers.k8s.driver.K8sScaler;
import edu.iu.dsc.tws.rsched.schedulers.k8s.master.JobMasterRequestObject;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class KubernetesLauncher
implements ILauncher,
IJobTerminator {
    private static final Logger LOG = Logger.getLogger(KubernetesLauncher.class.getName());
    private Config config;
    private KubernetesController controller = new KubernetesController();
    private String namespace;
    private JobSubmissionStatus jobSubmissionStatus = new JobSubmissionStatus();

    public void initialize(Config conf) {
        this.config = conf;
        this.namespace = KubernetesContext.namespace(this.config);
        this.controller.init(this.namespace);
    }

    public Twister2JobState launch(JobAPI.Job job) {
        boolean jobMasterCompleted;
        boolean volumesSetup;
        Twister2JobState state = new Twister2JobState(false);
        if (!this.configParametersOK(job)) {
            return state;
        }
        String jobID = job.getJobId();
        String jobPackageFile = SchedulerContext.temporaryPackagesPath((Config)this.config) + "/" + SchedulerContext.jobPackageFileName((Config)this.config);
        File jobFile = new File(jobPackageFile);
        if (!jobFile.exists()) {
            LOG.log(Level.SEVERE, "Can not access job package file: " + jobPackageFile + "\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            return state;
        }
        long jobFileSize = jobFile.length();
        boolean allEntitiesOK = this.checkEntitiesForJob(job);
        if (!allEntitiesOK) {
            return state;
        }
        RequestObjectBuilder.init(this.config, job.getJobId(), jobFileSize);
        JobMasterRequestObject.init(this.config, job.getJobId(), jobFileSize);
        boolean servicesCreated = this.initServices(jobID);
        if (!servicesCreated) {
            this.clearupWhenSubmissionFails(jobID);
            return state;
        }
        if (SchedulerContext.persistentVolumeRequested((Config)this.config) && !(volumesSetup = this.initPersistentVolumeClaim(job))) {
            this.clearupWhenSubmissionFails(jobID);
            return state;
        }
        boolean statefulSetInitialized = this.initStatefulSets(job);
        if (!statefulSetInitialized) {
            this.clearupWhenSubmissionFails(jobID);
            return state;
        }
        if (JobMasterContext.jobMasterRunsInClient((Config)this.config) && !(jobMasterCompleted = this.startJobMasterOnClient(job))) {
            LOG.log(Level.SEVERE, "JobMaster can not be started. \n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            this.clearupWhenSubmissionFails(jobID);
            return state;
        }
        state.setRequestGranted(true);
        return state;
    }

    private String getJobMasterIP(JobAPI.Job job) {
        if (JobMasterContext.jobMasterRunsInClient((Config)this.config)) {
            try {
                return InetAddress.getLocalHost().getHostAddress();
            }
            catch (UnknownHostException e) {
                throw new RuntimeException("Exception when getting local host address: ", e);
            }
        }
        String jobMasterIP = PodWatchUtils.getJobMasterIpByWatchingPodToRunning(KubernetesContext.namespace(this.config), job.getJobId(), 100);
        if (jobMasterIP == null) {
            throw new RuntimeException("Job master is running in a separate pod, but this worker can not get the job master IP address from Kubernetes master.\nJob master address: " + jobMasterIP);
        }
        LOG.info("Job master address: " + jobMasterIP);
        return jobMasterIP;
    }

    private boolean startJobMasterOnClient(JobAPI.Job job) {
        String dashAddress = JobMasterContext.dashboardHost((Config)this.config);
        if (dashAddress.endsWith("svc.cluster.local")) {
            String dashIP = this.getDashboardIP(dashAddress);
            String dashURL = "http://" + dashIP;
            if (dashIP == null) {
                LOG.warning("Could not get Dashboard server IP address from dashboard service name: " + dashAddress + " will not connect to Dashboard. *****");
                dashURL = null;
            }
            this.config = JobMasterContext.updateDashboardHost((Config)this.config, (String)dashURL);
            LOG.info("Dashboard server HTTP URL: " + dashURL);
        }
        String hostAdress = RequestObjectBuilder.getJobMasterIP();
        JobMasterAPI.NodeInfo nodeInfo = NodeInfoUtils.createNodeInfo((String)hostAdress, null, null);
        K8sScaler k8sScaler = new K8sScaler(this.config, job, this.controller);
        JobMasterAPI.JobMasterState initialState = JobMasterAPI.JobMasterState.JM_STARTED;
        JobMaster jobMaster = new JobMaster(this.config, hostAdress, (IJobTerminator)this, job, nodeInfo, (IScalerPerCluster)k8sScaler, initialState);
        jobMaster.addShutdownHook(true);
        try {
            jobMaster.startJobMasterBlocking();
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
            return false;
        }
        return true;
    }

    private String getDashboardIP(String dashAddress) {
        String dashServiceName = dashAddress;
        int prefixIndex = dashAddress.indexOf("://");
        if (prefixIndex != -1) {
            int serviceNameStartIndex = prefixIndex + 3;
            dashServiceName = dashAddress.substring(serviceNameStartIndex);
        }
        int dotIndex = dashServiceName.indexOf(".");
        dashServiceName = dashServiceName.substring(0, dotIndex);
        return this.controller.getServiceIP(dashServiceName);
    }

    private boolean checkEntitiesForJob(JobAPI.Job job) {
        String secretName;
        boolean secretExists;
        boolean statefulSetExists;
        String pvcName;
        String existingService;
        String jobID = job.getJobId();
        String workersServiceName = KubernetesUtils.createServiceName(job.getJobId());
        ArrayList<String> serviceNames = new ArrayList<String>();
        serviceNames.add(workersServiceName);
        String jobMasterServiceName = KubernetesUtils.createJobMasterServiceName(jobID);
        if (!JobMasterContext.jobMasterRunsInClient((Config)this.config)) {
            serviceNames.add(jobMasterServiceName);
        }
        if ((existingService = this.controller.existServices(serviceNames)) != null) {
            LOG.severe("There is already a service with the name: " + existingService + "\nAnother job might be running. \nFirst terminate that job or create a job with a different name.\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            return false;
        }
        if (SchedulerContext.persistentVolumeRequested((Config)this.config) && this.controller.existPersistentVolumeClaim(pvcName = KubernetesUtils.createPersistentVolumeClaimName(jobID))) {
            LOG.severe("Another job might be running. \nFirst terminate that job or create a job with a different name.\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            return false;
        }
        ArrayList<String> statefulSetNames = new ArrayList<String>();
        for (int i = 0; i < job.getComputeResourceList().size(); ++i) {
            statefulSetNames.add(KubernetesUtils.createWorkersStatefulSetName(jobID, i));
        }
        if (!JobMasterContext.jobMasterRunsInClient((Config)this.config)) {
            String jobMasterStatefulSetName = KubernetesUtils.createJobMasterStatefulSetName(jobID);
            statefulSetNames.add(jobMasterStatefulSetName);
        }
        if (statefulSetExists = this.controller.existStatefulSets(statefulSetNames)) {
            LOG.severe("First terminate the previously running job with the same name. \nOr submit the job with a different job name\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            return false;
        }
        if (SchedulerContext.useOpenMPI((Config)this.config) && !(secretExists = this.controller.existSecret(secretName = KubernetesContext.secretName(this.config)))) {
            LOG.severe("No Secret object is available in the cluster with the name: " + secretName + "\nFirst create this object or make that object created by your cluster admin.\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            return false;
        }
        return true;
    }

    private boolean initServices(String jobID) {
        String workersServiceName = KubernetesUtils.createServiceName(jobID);
        String jobMasterServiceName = KubernetesUtils.createJobMasterServiceName(jobID);
        V1Service serviceForWorkers = null;
        serviceForWorkers = KubernetesContext.nodePortServiceRequested(this.config) ? RequestObjectBuilder.createNodePortServiceObject() : RequestObjectBuilder.createJobServiceObject();
        boolean serviceCreated = this.controller.createService(serviceForWorkers);
        if (!serviceCreated) {
            LOG.severe("Following service could not be created: " + workersServiceName + "\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            return false;
        }
        this.jobSubmissionStatus.setServiceForWorkersCreated(true);
        if (!JobMasterContext.jobMasterRunsInClient((Config)this.config)) {
            V1Service serviceForJobMaster = JobMasterRequestObject.createJobMasterHeadlessServiceObject();
            serviceCreated = this.controller.createService(serviceForJobMaster);
            if (serviceCreated) {
                this.jobSubmissionStatus.setServiceForJobMasterCreated(true);
            } else {
                LOG.severe("Following service could not be created: " + jobMasterServiceName + "\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
                return false;
            }
        }
        return true;
    }

    private boolean initPersistentVolumeClaim(JobAPI.Job job) {
        String pvcName = KubernetesUtils.createPersistentVolumeClaimName(job.getJobId());
        V1PersistentVolumeClaim pvc = RequestObjectBuilder.createPersistentVolumeClaimObject(pvcName, job.getNumberOfWorkers());
        boolean claimCreated = this.controller.createPersistentVolumeClaim(pvc);
        if (!claimCreated) {
            LOG.log(Level.SEVERE, "PersistentVolumeClaim could not be created. \n++++++++++++++++++ Aborting submission ++++++++++++++++++");
            return false;
        }
        this.jobSubmissionStatus.setPvcCreated(true);
        return true;
    }

    private boolean initStatefulSets(JobAPI.Job job) {
        String encodedNodeInfoList = null;
        if (!KubernetesContext.nodeLocationsFromConfig(this.config)) {
            String rackLabelKey = KubernetesContext.rackLabelKeyForK8s(this.config);
            String dcLabelKey = KubernetesContext.datacenterLabelKeyForK8s(this.config);
            ArrayList<JobMasterAPI.NodeInfo> nodeInfoList = this.controller.getNodeInfo(rackLabelKey, dcLabelKey);
            encodedNodeInfoList = NodeInfoUtils.encodeNodeInfoList(nodeInfoList);
            LOG.fine("NodeInfo objects: size " + nodeInfoList.size() + "\n" + NodeInfoUtils.listToString(nodeInfoList));
        }
        if (!JobMasterContext.jobMasterRunsInClient((Config)this.config)) {
            V1StatefulSet jobMasterStatefulSet = JobMasterRequestObject.createStatefulSetObject(encodedNodeInfoList);
            if (jobMasterStatefulSet == null) {
                return false;
            }
            boolean statefulSetCreated = this.controller.createStatefulSet(jobMasterStatefulSet);
            if (statefulSetCreated) {
                this.jobSubmissionStatus.addCreatedStatefulSetName(jobMasterStatefulSet.getMetadata().getName());
            } else {
                LOG.severe("Please run terminate job to clear up any artifacts from previous jobs.\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
                return false;
            }
        }
        for (int i = 0; i < job.getComputeResourceList().size(); ++i) {
            JobAPI.ComputeResource computeResource = JobUtils.getComputeResource(job, i);
            if (computeResource == null) {
                LOG.severe("Something wrong with the job object. Can not get ComputeResource from job\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
                return false;
            }
            V1StatefulSet statefulSet = RequestObjectBuilder.createStatefulSetForWorkers(computeResource, encodedNodeInfoList);
            if (statefulSet == null) {
                return false;
            }
            boolean statefulSetCreated = this.controller.createStatefulSet(statefulSet);
            if (!statefulSetCreated) {
                LOG.severe("Please run terminate job to clear up any artifacts from previous jobs.\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
                return false;
            }
            this.jobSubmissionStatus.addCreatedStatefulSetName(statefulSet.getMetadata().getName());
        }
        return true;
    }

    private boolean configParametersOK(JobAPI.Job job) {
        List<String> uploaders = Arrays.asList("edu.iu.dsc.tws.rsched.uploaders.k8s.K8sUploader", "edu.iu.dsc.tws.rsched.uploaders.s3.S3Uploader");
        if (!uploaders.contains(SchedulerContext.uploaderClass((Config)this.config))) {
            LOG.log(Level.SEVERE, String.format("Provided uploader is not supported: " + SchedulerContext.uploaderClass((Config)this.config) + ". \nSupporter uploaders: " + uploaders + "\n++++++++++++++++++ Aborting submission ++++++++++++++++++", new Object[0]));
            return false;
        }
        if (SchedulerContext.useOpenMPI((Config)this.config)) {
            int workersPerPod = job.getComputeResource(0).getWorkersPerPod();
            for (int i = 1; i < job.getComputeResourceList().size(); ++i) {
                if (workersPerPod == job.getComputeResource(i).getWorkersPerPod()) continue;
                LOG.log(Level.SEVERE, String.format("When OpenMPI is used, all workersPerPod values in ComputeResources have to be the same. \n++++++++++++++++++ Aborting submission ++++++++++++++++++", new Object[0]));
                return false;
            }
        }
        if (KubernetesContext.bindWorkerToCPU(this.config)) {
            for (JobAPI.ComputeResource computeResource : job.getComputeResourceList()) {
                double cpus = computeResource.getCpu();
                if (cpus % 1.0 == 0.0) continue;
                LOG.log(Level.SEVERE, String.format("When %s is true, the value of cpu has to be an int cpu= " + cpus + "\n++++++++++++++++++ Aborting submission ++++++++++++++++++", "kubernetes.bind.worker.to.cpu"));
                return false;
            }
        }
        if (KubernetesContext.workerToNodeMapping(this.config)) {
            String operator = KubernetesContext.workerMappingOperator(this.config);
            List<String> values = KubernetesContext.workerMappingValues(this.config);
            if (("Exists".equalsIgnoreCase(operator) || "DoesNotExist".equalsIgnoreCase(operator)) && values != null && values.size() != 0) {
                LOG.log(Level.SEVERE, String.format("When the value of %s is either Exists or DoesNotExist\n%s list must be empty. Current content of this list: " + values + "\n++++++++++++++++++ Aborting submission ++++++++++++++++++", "twister2.resource.kubernetes.worker.mapping.operator", "twister2.resource.kubernetes.worker.mapping.values"));
                return false;
            }
        }
        if (KubernetesContext.nodePortServiceRequested(this.config)) {
            for (JobAPI.ComputeResource computeResource : job.getComputeResourceList()) {
                if (computeResource.getWorkersPerPod() == 1) continue;
                LOG.log(Level.SEVERE, "workersPerPod value must be 1, when starting NodePort service. Please change the config value and resubmit the job\n++++++++++++++++++ Aborting submission ++++++++++++++++++");
                return false;
            }
        }
        return true;
    }

    public void close() {
    }

    private void clearupWhenSubmissionFails(String jobID) {
        LOG.info("Will clear up any resources created during the job submission process.");
        if (this.jobSubmissionStatus.isServiceForWorkersCreated()) {
            String serviceName = KubernetesUtils.createServiceName(jobID);
            this.controller.deleteService(serviceName);
        }
        if (this.jobSubmissionStatus.isServiceForJobMasterCreated()) {
            String jobMasterServiceName = KubernetesUtils.createJobMasterServiceName(jobID);
            this.controller.deleteService(jobMasterServiceName);
        }
        ArrayList<String> ssNameLists = this.jobSubmissionStatus.getCreatedStatefulSetNames();
        for (String ssName : ssNameLists) {
            this.controller.deleteStatefulSet(ssName);
        }
        if (this.jobSubmissionStatus.isPvcCreated()) {
            String pvcName = KubernetesUtils.createPersistentVolumeClaimName(jobID);
            boolean bl = this.controller.deletePersistentVolumeClaim(pvcName);
        }
    }

    public boolean terminateJob(String jobID) {
        String jobMasterStatefulSetName = KubernetesUtils.createJobMasterStatefulSetName(jobID);
        boolean deleted = this.controller.deleteStatefulSet(jobMasterStatefulSetName);
        ArrayList<String> ssNameLists = this.controller.getStatefulSetsForJobWorkers(jobID);
        for (String ssName : ssNameLists) {
            this.controller.deleteStatefulSet(ssName);
        }
        String serviceName = KubernetesUtils.createServiceName(jobID);
        deleted = this.controller.deleteService(serviceName);
        String jobMasterServiceName = KubernetesUtils.createJobMasterServiceName(jobID);
        this.controller.deleteService(jobMasterServiceName);
        String pvcName = KubernetesUtils.createPersistentVolumeClaimName(jobID);
        boolean claimDeleted = this.controller.deletePersistentVolumeClaim(pvcName);
        return true;
    }
}

