/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s.mpi;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.IWorkerStatusUpdater;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.NodeInfoUtils;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.core.WorkerRuntime;
import edu.iu.dsc.tws.rsched.schedulers.k8s.K8sEnvVariables;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sPersistentVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sVolatileVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import edu.iu.dsc.tws.rsched.worker.MPIWorkerManager;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import mpi.MPI;
import mpi.MPIException;

public final class MPIWorkerStarter {
    private static final Logger LOG = Logger.getLogger(MPIWorkerStarter.class.getName());
    private static Config config = null;
    private static int workerID = -1;
    private static int numberOfWorkers = -1;
    private static JobMasterAPI.WorkerInfo workerInfo;
    private static String jobID;
    private static JobAPI.Job job;
    private static JobAPI.ComputeResource computeResource;

    private MPIWorkerStarter() {
    }

    public static void main(String[] args) {
        LoggingHelper.setLoggingFormat((String)"[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        String jobMasterIP = System.getenv(K8sEnvVariables.JOB_MASTER_IP.name());
        jobID = System.getenv(K8sEnvVariables.JOB_ID.name());
        boolean restoreJob = Boolean.parseBoolean(System.getenv(K8sEnvVariables.RESTORE_JOB.name()));
        if (jobMasterIP == null) {
            throw new RuntimeException("JobMasterIP address is null");
        }
        if (jobID == null) {
            throw new RuntimeException("jobID is null");
        }
        String configDir = "/twister2-memory-dir/twister2-job";
        config = K8sWorkerUtils.loadConfig(configDir);
        try {
            MPI.Init((String[])args);
            workerID = MPI.COMM_WORLD.getRank();
            numberOfWorkers = MPI.COMM_WORLD.getSize();
        }
        catch (MPIException e) {
            LOG.log(Level.SEVERE, "Could not get rank or size from MPI.COMM_WORLD", e);
            throw new RuntimeException(e);
        }
        K8sPersistentVolume pv = null;
        if (KubernetesContext.persistentVolumeRequested((Config)config)) {
            String persistentJobDir = "/persistent";
            pv = new K8sPersistentVolume(persistentJobDir, workerID);
        }
        K8sWorkerUtils.initWorkerLogger(workerID, pv, config);
        String jobDescFileName = SchedulerContext.createJobDescriptionFileName((String)jobID);
        jobDescFileName = "/twister2-memory-dir/twister2-job/" + jobDescFileName;
        job = JobUtils.readJobFile(jobDescFileName);
        LOG.info("Job description file is loaded: " + jobDescFileName);
        config = JobUtils.overrideConfigs(job, config);
        config = JobUtils.updateConfigs(job, config);
        config = Config.newBuilder().putAll(config).put("twister2.job.master.ip", (Object)jobMasterIP).put("twister2.checkpointing.restore.job", (Object)restoreJob).build();
        InetAddress localHost = null;
        String podName = null;
        try {
            localHost = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            LOG.log(Level.SEVERE, "Cannot get localHost.", e);
        }
        String podIP = localHost.getHostAddress();
        podName = localHost.getHostName();
        if (podName.indexOf(".") > 0) {
            podName = podName.substring(0, podName.indexOf("."));
        }
        int workerPort = KubernetesContext.workerBasePort(config) + workerID * (SchedulerContext.numberOfAdditionalPorts((Config)config) + 1);
        String nodeIP = null;
        try {
            nodeIP = Files.readAllLines(Paths.get("hostip.txt", new String[0])).get(0);
        }
        catch (IOException e) {
            LOG.log(Level.WARNING, "Could not get host-ip from hostip.txt file.", e);
        }
        JobMasterAPI.NodeInfo nodeInfo = null;
        if (nodeIP == null) {
            LOG.warning("Could not get nodeIP for this pod. Using podIP as nodeIP.");
            nodeInfo = NodeInfoUtils.createNodeInfo((String)podIP, null, null);
        } else if (KubernetesContext.nodeLocationsFromConfig(config)) {
            nodeInfo = KubernetesContext.getNodeInfo((Config)config, (String)nodeIP);
        } else {
            try {
                String encodedNodeInfos = Files.readAllLines(Paths.get("node-info-list.txt", new String[0])).get(0);
                nodeInfo = K8sWorkerUtils.getNodeInfoFromEncodedStr(encodedNodeInfos, nodeIP);
            }
            catch (IOException e) {
                LOG.log(Level.WARNING, "Could not get node-info list from file: node-info-list.txt. Will use podIP as nodeIP", e);
                nodeInfo = NodeInfoUtils.createNodeInfo((String)podIP, null, null);
            }
        }
        LOG.info(String.format("PodName: %s, NodeInfo for this worker: %s", podName, nodeInfo));
        computeResource = K8sWorkerUtils.getComputeResource(job, podName);
        Map<String, Integer> additionalPorts = K8sWorkerUtils.generateAdditionalPorts(config, workerPort);
        workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)podIP, (int)workerPort, (JobMasterAPI.NodeInfo)nodeInfo, (JobAPI.ComputeResource)computeResource, additionalPorts);
        LOG.info("Worker information summary: \nMPI Rank(workerID): " + workerID + "\nMPI Size(number of workers): " + numberOfWorkers + "\nPOD_IP: " + podIP + "\nHOSTNAME(podname): " + podName);
        int restartCount = K8sWorkerUtils.getAndInitRestartCount(config, jobID, workerInfo);
        WorkerRuntime.init(config, job, workerInfo, restartCount);
        IWorkerController workerController = WorkerRuntime.getWorkerController();
        IWorkerStatusUpdater workerStatusUpdater = WorkerRuntime.getWorkerStatusUpdater();
        if (restartCount >= FaultToleranceContext.maxMpiJobRestarts((Config)config)) {
            workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FULLY_FAILED);
            WorkerRuntime.close();
            return;
        }
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
            LOG.log(Level.SEVERE, "Uncaught exception in the thread " + thread + ". Worker FAILED...", throwable);
            if (restartCount >= FaultToleranceContext.maxMpiJobRestarts((Config)config) - 1) {
                workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FULLY_FAILED);
            } else {
                workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FAILED);
            }
            WorkerRuntime.close();
            System.exit(1);
        });
        boolean completed = MPIWorkerStarter.startWorker(workerController, (IPersistentVolume)pv, podName);
        if (completed) {
            workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.COMPLETED);
        } else {
            workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FULLY_FAILED);
        }
        try {
            MPI.Finalize();
        }
        catch (MPIException mPIException) {
            // empty catch block
        }
        WorkerRuntime.close();
    }

    public static boolean startWorker(IWorkerController workerController, IPersistentVolume pv, String podName) {
        IWorker worker = JobUtils.initializeIWorker(job);
        K8sVolatileVolume volatileVolume = null;
        if (computeResource.getDiskGigaBytes() > 0.0) {
            volatileVolume = new K8sVolatileVolume(jobID, workerID);
        }
        MPIWorkerManager mpiWorkerManager = new MPIWorkerManager();
        return mpiWorkerManager.execute(config, job, workerController, pv, volatileVolume, worker);
    }

    static {
        jobID = null;
        job = null;
        computeResource = null;
    }
}

