/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s.worker;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.IWorkerStatusUpdater;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingContext;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.core.WorkerRuntime;
import edu.iu.dsc.tws.rsched.schedulers.k8s.K8sEnvVariables;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.PodWatchUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sPersistentVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sVolatileVolume;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import edu.iu.dsc.tws.rsched.worker.WorkerManager;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class K8sWorkerStarter {
    private static final Logger LOG = Logger.getLogger(K8sWorkerStarter.class.getName());
    private static Config config = null;
    private static int workerID = -1;
    private static JobMasterAPI.WorkerInfo workerInfo;
    private static String jobID;
    private static JobAPI.Job job;
    private static JobAPI.ComputeResource computeResource;
    private static boolean externallyKilled;

    private K8sWorkerStarter() {
    }

    public static void main(String[] args) {
        LoggingHelper.setLoggingFormat((String)"[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        int workerPort = Integer.parseInt(System.getenv(K8sEnvVariables.WORKER_PORT.name()));
        String containerName = System.getenv(K8sEnvVariables.CONTAINER_NAME.name());
        String podName = System.getenv(K8sEnvVariables.POD_NAME.name());
        String hostIP = System.getenv(K8sEnvVariables.HOST_IP.name());
        String hostName = System.getenv(K8sEnvVariables.HOST_NAME.name());
        String jobMasterIP = System.getenv(K8sEnvVariables.JOB_MASTER_IP.name());
        String encodedNodeInfoList = System.getenv(K8sEnvVariables.ENCODED_NODE_INFO_LIST.name());
        jobID = System.getenv(K8sEnvVariables.JOB_ID.name());
        boolean restoreJob = Boolean.parseBoolean(System.getenv(K8sEnvVariables.RESTORE_JOB.name()));
        if (jobID == null) {
            throw new RuntimeException("JobID is null");
        }
        String configDir = "/twister2-memory-dir" + File.separator + "twister2-job";
        config = K8sWorkerUtils.loadConfig(configDir);
        String jobDescFileName = SchedulerContext.createJobDescriptionFileName((String)jobID);
        jobDescFileName = "/twister2-memory-dir" + File.separator + "twister2-job" + File.separator + jobDescFileName;
        job = JobUtils.readJobFile(jobDescFileName);
        LOG.info("Job description file is loaded: " + jobDescFileName);
        config = JobUtils.overrideConfigs(job, config);
        config = JobUtils.updateConfigs(job, config);
        config = Config.newBuilder().putAll(config).put("twister2.checkpointing.restore.job", (Object)restoreJob).build();
        if (!job.getDriverClassName().isEmpty() || !ZKContext.isZooKeeperServerUsed((Config)config) || CheckpointingContext.isCheckpointingEnabled((Config)config)) {
            jobMasterIP = K8sWorkerStarter.updateJobMasterIp(jobMasterIP);
        }
        InetAddress localHost = null;
        try {
            localHost = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException("Cannot get localHost.", e);
        }
        String podIP = localHost.getHostAddress();
        JobMasterAPI.NodeInfo nodeInfo = KubernetesContext.nodeLocationsFromConfig(config) ? KubernetesContext.getNodeInfo((Config)config, (String)hostIP) : K8sWorkerUtils.getNodeInfoFromEncodedStr(encodedNodeInfoList, hostIP);
        LOG.info("PodName: " + podName + ", NodeInfo: " + nodeInfo);
        workerID = K8sWorkerUtils.calculateWorkerID(job, podName, containerName);
        computeResource = K8sWorkerUtils.getComputeResource(job, podName);
        Map<String, Integer> additionalPorts = K8sWorkerUtils.generateAdditionalPorts(config, workerPort);
        workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)podIP, (int)workerPort, (JobMasterAPI.NodeInfo)nodeInfo, (JobAPI.ComputeResource)computeResource, additionalPorts);
        K8sPersistentVolume pv = null;
        if (KubernetesContext.persistentVolumeRequested((Config)config)) {
            String persistentJobDir = "/persistent";
            pv = new K8sPersistentVolume(persistentJobDir, workerID);
        }
        K8sWorkerUtils.initWorkerLogger(workerID, pv, config);
        LOG.info("Worker information summary: \nworkerID: " + workerID + "\nPOD_IP: " + podIP + "\nHOSTNAME(podname): " + podName + "\nworkerPort: " + workerPort + "\nhostName(nodeName): " + hostName + "\nhostIP(nodeIP): " + hostIP + "\n");
        int restartCount = K8sWorkerUtils.getAndInitRestartCount(config, jobID, workerInfo);
        WorkerRuntime.init(config, job, workerInfo, restartCount);
        IWorkerController workerController = WorkerRuntime.getWorkerController();
        IWorkerStatusUpdater workerStatusUpdater = WorkerRuntime.getWorkerStatusUpdater();
        if (restartCount >= FaultToleranceContext.maxWorkerRestarts((Config)config)) {
            workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FULLY_FAILED);
            WorkerRuntime.close();
            externallyKilled = false;
            return;
        }
        K8sWorkerStarter.addShutdownHook(workerStatusUpdater);
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
            LOG.log(Level.SEVERE, "Uncaught exception in the thread " + thread + ". Worker FAILED...", throwable);
            if (restartCount >= FaultToleranceContext.maxWorkerRestarts((Config)config) - 1) {
                workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FULLY_FAILED);
            } else {
                workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FAILED);
            }
            WorkerRuntime.close();
            externallyKilled = false;
            System.exit(1);
        });
        boolean completed = K8sWorkerStarter.startWorker(workerController, (IPersistentVolume)pv);
        if (completed) {
            workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.COMPLETED);
        } else {
            workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.FULLY_FAILED);
        }
        WorkerRuntime.close();
        externallyKilled = false;
    }

    public static String updateJobMasterIp(String jobMasterIP) {
        if (JobMasterContext.jobMasterRunsInClient((Config)config)) {
            if (jobMasterIP == null || jobMasterIP.trim().length() == 0) {
                throw new RuntimeException("Job master running in the client, but this worker got job master IP as empty from environment variables.");
            }
        } else {
            jobMasterIP = K8sWorkerUtils.getJobMasterServiceIP(KubernetesContext.namespace(config), jobID);
            if (jobMasterIP == null) {
                jobMasterIP = PodWatchUtils.getJobMasterIpByWatchingPodToRunning(KubernetesContext.namespace(config), jobID, 100);
                PodWatchUtils.close();
            }
            if (jobMasterIP == null) {
                throw new RuntimeException("Job master is running in a separate pod, but this worker can not get the job master IP address from Kubernetes master.\nJob master address: " + jobMasterIP);
            }
            LOG.info("Job master address: " + jobMasterIP);
        }
        config = Config.newBuilder().putAll(config).put("twister2.job.master.ip", (Object)jobMasterIP).build();
        return jobMasterIP;
    }

    public static boolean startWorker(IWorkerController workerController, IPersistentVolume pv) {
        IWorker worker = JobUtils.initializeIWorker(job);
        K8sVolatileVolume volatileVolume = null;
        if (computeResource.getDiskGigaBytes() > 0.0) {
            volatileVolume = new K8sVolatileVolume(jobID, workerID);
        }
        WorkerManager workerManager = new WorkerManager(config, job, workerController, pv, volatileVolume, worker);
        return workerManager.execute();
    }

    public static void addShutdownHook(final IWorkerStatusUpdater workerStatusUpdater) {
        Thread hookThread = new Thread(){

            @Override
            public void run() {
                if (!externallyKilled) {
                    return;
                }
                workerStatusUpdater.updateWorkerStatus(JobMasterAPI.WorkerState.KILLED);
                WorkerRuntime.close();
            }
        };
        Runtime.getRuntime().addShutdownHook(hookThread);
    }

    static {
        jobID = null;
        job = null;
        computeResource = null;
        externallyKilled = true;
    }
}

