/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.job;

import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.checkpointing.StateStore;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.Context;
import edu.iu.dsc.tws.checkpointing.util.CheckpointUtils;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingConfigurations;
import edu.iu.dsc.tws.common.config.ConfigLoader;
import edu.iu.dsc.tws.common.config.ConfigSerializer;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.io.IOException;
import java.util.logging.Logger;

public final class Twister2Submitter {
    private static final Logger LOG = Logger.getLogger(Twister2Submitter.class.getName());

    private Twister2Submitter() {
    }

    public static void submitJob(Twister2Job twister2Job, Config config) {
        boolean startingFromACheckpoint = CheckpointingConfigurations.startingFromACheckpoint((Config)config);
        if (!startingFromACheckpoint) {
            switch (Context.clusterType((Config)config)) {
                case "kubernetes": {
                    Twister2Submitter.processJobNameForK8s(twister2Job);
                    break;
                }
                case "mesos": 
                case "nomad": {
                    twister2Job.setJobName(twister2Job.getJobName() + System.currentTimeMillis());
                    break;
                }
            }
        }
        Config configCopy = JobUtils.resolveJobId(config, twister2Job.getJobName());
        JobAPI.Job job = twister2Job.serialize();
        String jobId = configCopy.getStringValue("twister2.job.id");
        if (CheckpointingConfigurations.isCheckpointingEnabled((Config)configCopy)) {
            LOG.info("Checkpointing has enabled for this job.");
            StateStore stateStore = CheckpointUtils.getStateStore((Config)configCopy);
            stateStore.init(configCopy, new String[]{jobId});
            try {
                if (startingFromACheckpoint) {
                    if (!CheckpointUtils.containsJobInStore((String)jobId, (StateStore)stateStore)) {
                        throw new RuntimeException("Couldn't find job state in store to restart " + jobId);
                    }
                    LOG.info("Found job " + jobId + " in state store. Restoring...");
                    byte[] jobMetaBytes = CheckpointUtils.restoreJobMeta((String)jobId, (StateStore)stateStore);
                    job = JobAPI.Job.parseFrom((byte[])jobMetaBytes);
                    byte[] configBytes = CheckpointUtils.restoreJobConfig((String)jobId, (StateStore)stateStore);
                    configCopy = ConfigLoader.loadConfig((byte[])configBytes);
                } else {
                    LOG.info("Saving job config and metadata");
                    CheckpointUtils.saveJobConfigAndMeta((String)jobId, (byte[])job.toByteArray(), (byte[])ConfigSerializer.serialize((Config)configCopy), (StateStore)stateStore);
                }
            }
            catch (IOException e) {
                LOG.severe("Failed to submit th checkpointing enabled job");
                throw new RuntimeException(e);
            }
        }
        LOG.info("The job to be submitted: \n" + JobUtils.toString(job));
        Config updatedConfig = JobUtils.updateConfigs(job, configCopy);
        ResourceAllocator resourceAllocator = new ResourceAllocator();
        resourceAllocator.submitJob(job, updatedConfig);
    }

    public static void terminateJob(String jobName, Config config) {
        if (Context.clusterType((Config)config).equals("kubernetes") && !KubernetesUtils.jobNameConformsToK8sNamingRules(jobName)) {
            LOG.info("JobName does not conform to Kubernetes naming rules: [" + jobName + "] Only lower case alphanumeric characters and dash(-) are allowed.");
            jobName = KubernetesUtils.convertJobNameToK8sFormat(jobName);
            LOG.info("****************** JobName modified. Following jobname will be used: " + jobName);
        }
        ResourceAllocator resourceAllocator = new ResourceAllocator();
        resourceAllocator.terminateJob(jobName, config);
    }

    public static void processJobNameForK8s(Twister2Job twister2Job) {
        String jobName = twister2Job.getJobName();
        if (KubernetesUtils.jobNameConformsToK8sNamingRules(jobName)) {
            return;
        }
        LOG.info("JobName does not conform to Kubernetes naming rules: " + jobName + " Only lower case alphanumeric characters and dashes(-) are allowed");
        jobName = KubernetesUtils.convertJobNameToK8sFormat(jobName);
        twister2Job.setJobName(jobName);
        LOG.info("******************* JobName modified. Following jobname will be used: " + jobName);
    }
}

