/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s.master;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.driver.IScalerPerCluster;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingContext;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.zk.JobZNodeManager;
import edu.iu.dsc.tws.common.zk.ZKBarrierManager;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKEphemStateManager;
import edu.iu.dsc.tws.common.zk.ZKEventsManager;
import edu.iu.dsc.tws.common.zk.ZKPersStateManager;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.master.IJobTerminator;
import edu.iu.dsc.tws.master.server.JobMaster;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.schedulers.k8s.K8sEnvVariables;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.driver.K8sScaler;
import edu.iu.dsc.tws.rsched.schedulers.k8s.master.JobKillWatcher;
import edu.iu.dsc.tws.rsched.schedulers.k8s.master.JobTerminator;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;

public final class JobMasterStarter {
    private static final Logger LOG = Logger.getLogger(JobMasterStarter.class.getName());
    public static JobAPI.Job job;

    private JobMasterStarter() {
    }

    public static void main(String[] args) {
        boolean zkInitialized;
        LoggingHelper.setLoggingFormat((String)"[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        String jobID = System.getenv(K8sEnvVariables.JOB_ID.name());
        String encodedNodeInfoList = System.getenv(K8sEnvVariables.ENCODED_NODE_INFO_LIST.name());
        String hostIP = System.getenv(K8sEnvVariables.HOST_IP.name());
        boolean restoreJob = Boolean.parseBoolean(System.getenv(K8sEnvVariables.RESTORE_JOB.name()));
        String configDir = "/twister2-memory-dir" + File.separator + "twister2-job";
        Config config = K8sWorkerUtils.loadConfig(configDir);
        String jobDescFileName = SchedulerContext.createJobDescriptionFileName((String)jobID);
        jobDescFileName = "/twister2-memory-dir" + File.separator + "twister2-job" + File.separator + jobDescFileName;
        job = JobUtils.readJobFile(jobDescFileName);
        LOG.info("Job description file is loaded: " + jobDescFileName);
        config = JobUtils.overrideConfigs(job, config);
        config = JobUtils.updateConfigs(job, config);
        config = Config.newBuilder().putAll(config).put("twister2.checkpointing.restore.job", (Object)restoreJob).build();
        K8sWorkerUtils.initLogger(config, "jobMaster");
        LOG.info("JobMaster is starting. Current time: " + System.currentTimeMillis());
        LOG.info("Number of configuration parameters: " + config.size());
        InetAddress localHost = null;
        try {
            localHost = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException("Cannot get localHost.", e);
        }
        String podIP = localHost.getHostAddress();
        JobMasterAPI.NodeInfo nodeInfo = KubernetesContext.nodeLocationsFromConfig(config) ? KubernetesContext.getNodeInfo((Config)config, (String)hostIP) : K8sWorkerUtils.getNodeInfoFromEncodedStr(encodedNodeInfoList, hostIP);
        LOG.info("NodeInfo for JobMaster: " + nodeInfo);
        KubernetesController controller = KubernetesController.init(KubernetesContext.namespace(config));
        JobTerminator jobTerminator = new JobTerminator(config, controller);
        K8sScaler k8sScaler = new K8sScaler(config, job, controller);
        String keyName = KubernetesUtils.createRestartJobMasterKey();
        int restartCount = K8sWorkerUtils.initRestartFromCM(controller, jobID, keyName);
        JobMasterAPI.JobMasterState initialState = JobMasterAPI.JobMasterState.JM_STARTED;
        if (restartCount > 0) {
            initialState = JobMasterAPI.JobMasterState.JM_RESTARTED;
            if (!ZKContext.isZooKeeperServerUsed((Config)config)) {
                jobTerminator.terminateJob(jobID, JobAPI.JobState.FAILED);
                return;
            }
        }
        if (ZKContext.isZooKeeperServerUsed((Config)config) && !(zkInitialized = JobMasterStarter.initializeZooKeeper(config, jobID, podIP, initialState))) {
            jobTerminator.terminateJob(jobID, JobAPI.JobState.FAILED);
            return;
        }
        JobMaster jobMaster = new JobMaster(config, podIP, (IJobTerminator)jobTerminator, job, nodeInfo, (IScalerPerCluster)k8sScaler, initialState);
        JobKillWatcher jkWatcher = new JobKillWatcher(KubernetesContext.namespace(config), jobID, controller, jobMaster);
        jkWatcher.start();
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
            LOG.log(Level.SEVERE, "Uncaught exception in the thread " + thread + ". Job Master FAILED...", throwable);
            jobMaster.jmFailed();
            jkWatcher.close();
            controller.close();
            throw new RuntimeException("Worker failed with the exception", throwable);
        });
        try {
            jobMaster.startJobMasterBlocking();
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
        controller.close();
    }

    public static boolean initializeZooKeeper(Config config, String jobID, String jmAddress, JobMasterAPI.JobMasterState initialState) {
        String zkServerAddresses = ZKContext.serverAddresses((Config)config);
        int sessionTimeoutMs = FaultToleranceContext.sessionTimeout((Config)config);
        CuratorFramework client = ZKUtils.connectToServer((String)zkServerAddresses, (int)sessionTimeoutMs);
        String rootPath = ZKContext.rootNode((Config)config);
        boolean jobZNodeExists = JobZNodeManager.isThereJobZNode((CuratorFramework)client, (String)rootPath, (String)jobID);
        if (initialState == JobMasterAPI.JobMasterState.JM_RESTARTED) {
            if (!jobZNodeExists) {
                LOG.severe("Job is restarting but job znode does not exists at ZK server at: " + ZKUtils.jobDir((String)rootPath, (String)jobID));
                return false;
            }
            ZKPersStateManager.updateJobMasterStatus((CuratorFramework)client, (String)rootPath, (String)jobID, (String)jmAddress, (JobMasterAPI.JobMasterState)JobMasterAPI.JobMasterState.JM_RESTARTED);
            ZKEventsManager.initEventCounter((CuratorFramework)client, (String)rootPath, (String)jobID);
            job = JobZNodeManager.readJobZNode((CuratorFramework)client, (String)rootPath, (String)jobID).getJob();
            return true;
        }
        if (!CheckpointingContext.startingFromACheckpoint((Config)config) && jobZNodeExists) {
            LOG.severe("Job is starting for the first time, but there is an existing znode at ZK server: " + ZKUtils.jobDir((String)rootPath, (String)jobID));
            return false;
        }
        if (CheckpointingContext.startingFromACheckpoint((Config)config) && jobZNodeExists) {
            JobZNodeManager.deleteJobZNode((CuratorFramework)client, (String)rootPath, (String)jobID);
        }
        JobZNodeManager.createJobZNode((CuratorFramework)client, (String)rootPath, (JobAPI.Job)job);
        ZKEphemStateManager.createEphemDir((CuratorFramework)client, (String)rootPath, (String)job.getJobId());
        ZKEventsManager.createEventsZNode((CuratorFramework)client, (String)rootPath, (String)job.getJobId());
        ZKBarrierManager.createDefaultBarrierDir((CuratorFramework)client, (String)rootPath, (String)job.getJobId());
        ZKBarrierManager.createInitBarrierDir((CuratorFramework)client, (String)rootPath, (String)job.getJobId());
        ZKPersStateManager.createPersStateDir((CuratorFramework)client, (String)rootPath, (String)job.getJobId());
        long jsTime = Long.parseLong(System.getenv(K8sEnvVariables.JOB_SUBMISSION_TIME.name()));
        JobZNodeManager.createJstZNode((CuratorFramework)client, (String)rootPath, (String)jobID, (long)jsTime);
        ZKPersStateManager.createJobMasterPersState((CuratorFramework)client, (String)rootPath, (String)jobID, (String)jmAddress);
        return true;
    }
}

