/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s.mpi;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.scheduler.SchedulerContext;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.NodeInfoUtils;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.PodWatchUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.mpi.MPIMasterStarter;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sPersistentVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sVolatileVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import mpi.MPI;
import mpi.MPIException;

public final class MPIWorkerStarter {
    private static final Logger LOG = Logger.getLogger(MPIWorkerStarter.class.getName());
    private static Config config = null;
    private static int workerID = -1;
    private static int numberOfWorkers = -1;
    private static JobMasterAPI.WorkerInfo workerInfo;
    private static JMWorkerAgent jobMasterAgent;
    private static String jobName;
    private static JobAPI.Job job;
    private static JobAPI.ComputeResource computeResource;

    private MPIWorkerStarter() {
    }

    public static void main(String[] args) {
        LoggingHelper.setLoggingFormat((String)"[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        String jobMasterIP = MPIMasterStarter.getJobMasterIPCommandLineArgumentValue(args[0]);
        jobName = args[1];
        String encodedNodeInfoList = args[2];
        if (jobMasterIP == null) {
            throw new RuntimeException("JobMasterIP address is null");
        }
        if (jobName == null) {
            throw new RuntimeException("jobName is null");
        }
        encodedNodeInfoList = encodedNodeInfoList.replaceAll("'", "");
        String configDir = "/twister2-memory-dir/twister2-job";
        config = K8sWorkerUtils.loadConfig(configDir);
        try {
            MPI.Init((String[])args);
            workerID = MPI.COMM_WORLD.getRank();
            numberOfWorkers = MPI.COMM_WORLD.getSize();
        }
        catch (MPIException e) {
            LOG.log(Level.SEVERE, "Could not get rank or size from MPI.COMM_WORLD", e);
            throw new RuntimeException(e);
        }
        K8sPersistentVolume pv = null;
        if (KubernetesContext.persistentVolumeRequested((Config)config)) {
            String persistentJobDir = "/persistent";
            pv = new K8sPersistentVolume(persistentJobDir, workerID);
        }
        K8sWorkerUtils.initWorkerLogger(workerID, pv, config);
        String jobDescFileName = SchedulerContext.createJobDescriptionFileName((String)jobName);
        jobDescFileName = "/twister2-memory-dir/twister2-job/" + jobDescFileName;
        job = JobUtils.readJobFile(null, jobDescFileName);
        LOG.info("Job description file is loaded: " + jobDescFileName);
        config = JobUtils.overrideConfigs(job, config);
        config = JobUtils.updateConfigs(job, config);
        config = K8sWorkerUtils.unsetWorkerIDAssigment(config);
        InetAddress localHost = null;
        String podName = null;
        try {
            localHost = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            LOG.log(Level.SEVERE, "Cannot get localHost.", e);
        }
        String podIP = localHost.getHostAddress();
        podName = localHost.getHostName();
        int workerPort = KubernetesContext.workerBasePort(config) + workerID * (SchedulerContext.numberOfAdditionalPorts((Config)config) + 1);
        String nodeIP = PodWatchUtils.getNodeIP(KubernetesContext.namespace(config), jobName, podIP);
        JobMasterAPI.NodeInfo nodeInfo = null;
        if (nodeIP == null) {
            LOG.warning("Could not get nodeIP for this pod. Using podIP as nodeIP.");
            nodeInfo = NodeInfoUtils.createNodeInfo((String)podIP, null, null);
        } else {
            nodeInfo = KubernetesContext.nodeLocationsFromConfig(config) ? KubernetesContext.getNodeInfo((Config)config, (String)nodeIP) : K8sWorkerUtils.getNodeInfoFromEncodedStr(encodedNodeInfoList, nodeIP);
        }
        LOG.info("NodeInfoUtils for this worker: " + nodeInfo);
        computeResource = K8sWorkerUtils.getComputeResource(job, podName);
        Map<String, Integer> additionalPorts = K8sWorkerUtils.generateAdditionalPorts(config, workerPort);
        workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)podIP, (int)workerPort, (JobMasterAPI.NodeInfo)nodeInfo, (JobAPI.ComputeResource)computeResource, additionalPorts);
        LOG.info("Worker information summary: \nMPI Rank(workerID): " + workerID + "\nMPI Size(number of workers): " + numberOfWorkers + "\nPOD_IP: " + podIP + "\nHOSTNAME(podname): " + podName);
        jobMasterAgent = JMWorkerAgent.createJMWorkerAgent((Config)config, (JobMasterAPI.WorkerInfo)workerInfo, (String)jobMasterIP, (int)JobMasterContext.jobMasterPort((Config)config), (int)job.getNumberOfWorkers());
        jobMasterAgent.startThreaded(false);
        jobMasterAgent.sendWorkerRunningMessage();
        MPIWorkerStarter.startWorker(jobMasterAgent, (IPersistentVolume)pv, podName);
        try {
            MPI.Finalize();
        }
        catch (MPIException mPIException) {
            // empty catch block
        }
        MPIWorkerStarter.closeWorker();
    }

    public static void startWorker(JMWorkerAgent jmWorkerAgent, IPersistentVolume pv, String podName) {
        IWorker worker;
        String workerClass = SchedulerContext.workerClass((Config)config);
        try {
            Object object = ReflectionUtils.newInstance((String)workerClass);
            worker = (IWorker)object;
            LOG.info("loaded worker class: " + workerClass);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.severe(String.format("failed to load the worker class %s", workerClass));
            throw new RuntimeException(e);
        }
        K8sVolatileVolume volatileVolume = null;
        if (computeResource.getDiskGigaBytes() > 0.0) {
            volatileVolume = new K8sVolatileVolume(jobName, workerID);
        }
        worker.execute(config, workerID, (IWorkerController)jmWorkerAgent.getJMWorkerController(), pv, (IVolatileVolume)volatileVolume);
    }

    public static void closeWorker() {
        jobMasterAgent.sendWorkerCompletedMessage();
        jobMasterAgent.close();
    }

    static {
        jobName = null;
        job = null;
        computeResource = null;
    }
}

