/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.nomad;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.scheduler.SchedulerContext;
import edu.iu.dsc.tws.common.config.ConfigLoader;
import edu.iu.dsc.tws.common.logging.LoggingContext;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.bootstrap.ZKJobMasterFinder;
import edu.iu.dsc.tws.rsched.schedulers.nomad.NomadContext;
import edu.iu.dsc.tws.rsched.schedulers.nomad.NomadController;
import edu.iu.dsc.tws.rsched.schedulers.nomad.NomadPersistentVolume;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.io.File;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

public final class NomadWorkerStarter {
    private static final Logger LOG = Logger.getLogger(NomadWorkerStarter.class.getName());
    private static int startingPort = 30000;
    private NomadController controller;
    private JMWorkerAgent masterClient;
    private Config config;
    private IWorkerController workerController;

    private NomadWorkerStarter(String[] args) {
        Options cmdOptions = null;
        try {
            cmdOptions = this.setupOptions();
            DefaultParser parser = new DefaultParser();
            CommandLine cmd = parser.parse(cmdOptions, args);
            int rank = 0;
            this.config = this.loadConfigurations(cmd, rank);
            this.controller = new NomadController(true);
            this.controller.initialize(this.config);
        }
        catch (ParseException e) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("SubmitterMain", cmdOptions);
            throw new RuntimeException("Error parsing command line options: ", e);
        }
    }

    public static void main(String[] args) {
        NomadWorkerStarter starter = new NomadWorkerStarter(args);
        starter.run();
    }

    public void run() {
        try {
            this.startWorker();
        }
        finally {
            this.closeWorker();
        }
    }

    private Options setupOptions() {
        Options options = new Options();
        Option containerClass = Option.builder((String)"c").desc("The class name of the container to launch").longOpt("container_class").hasArgs().argName("container class").required().build();
        Option configDirectory = Option.builder((String)"d").desc("The class name of the container to launch").longOpt("config_dir").hasArgs().argName("configuration directory").required().build();
        Option twister2Home = Option.builder((String)"t").desc("The class name of the container to launch").longOpt("twister2_home").hasArgs().argName("twister2 home").required().build();
        Option clusterType = Option.builder((String)"n").desc("The clustr type").longOpt("cluster_type").hasArgs().argName("cluster type").required().build();
        Option jobName = Option.builder((String)"j").desc("Job name").longOpt("job_name").hasArgs().argName("job name").required().build();
        options.addOption(twister2Home);
        options.addOption(containerClass);
        options.addOption(configDirectory);
        options.addOption(clusterType);
        options.addOption(jobName);
        return options;
    }

    private Config loadConfigurations(CommandLine cmd, int id) {
        String twister2Home = cmd.getOptionValue("twister2_home");
        String container = cmd.getOptionValue("container_class");
        String configDir = cmd.getOptionValue("config_dir");
        String clusterType = cmd.getOptionValue("cluster_type");
        String jobName = cmd.getOptionValue("job_name");
        LOG.log(Level.FINE, String.format("Initializing process with twister_home: %s container_class: %s config_dir: %s cluster_type: %s", twister2Home, container, configDir, clusterType));
        Config cfg = ConfigLoader.loadConfig((String)twister2Home, (String)configDir, (String)clusterType);
        Config workerConfig = Config.newBuilder().putAll(cfg).put(SchedulerContext.TWISTER2_HOME.getKey(), (Object)twister2Home).put("twister2.job.worker.class", (Object)container).put("twister2.container.id", (Object)id).put("twister2.cluster.type", (Object)clusterType).build();
        String jobDescFile = JobUtils.getJobDescriptionFilePath(jobName, workerConfig);
        JobAPI.Job job = JobUtils.readJobFile(null, jobDescFile);
        job.getNumberOfWorkers();
        Config updatedConfig = JobUtils.overrideConfigs(job, cfg);
        updatedConfig = Config.newBuilder().putAll(updatedConfig).put(SchedulerContext.TWISTER2_HOME.getKey(), (Object)twister2Home).put("twister2.job.worker.class", (Object)container).put("twister2.container.id", (Object)id).put("twister2.cluster.type", (Object)clusterType).put("twister2.job.name", (Object)job.getJobName()).build();
        return updatedConfig;
    }

    private void startWorker() {
        LOG.log(Level.INFO, "A worker process is starting...");
        this.workerController = this.createWorkerController();
        JobMasterAPI.WorkerInfo workerNetworkInfo = this.workerController.getWorkerInfo();
        String workerClass = SchedulerContext.workerClass((Config)this.config);
        try {
            LOG.log(Level.INFO, "Worker IP..:" + Inet4Address.getLocalHost().getHostAddress());
        }
        catch (UnknownHostException e) {
            e.printStackTrace();
        }
        try {
            List e = this.workerController.getAllWorkers();
        }
        catch (TimeoutException timeoutException) {
            LOG.log(Level.SEVERE, timeoutException.getMessage(), timeoutException);
            return;
        }
        try {
            Object object = ReflectionUtils.newInstance((String)workerClass);
            if (!(object instanceof IWorker)) {
                throw new RuntimeException("Job is not of time IWorker: " + object.getClass().getName());
            }
            IWorker container = (IWorker)object;
            container.execute(this.config, workerNetworkInfo.getWorkerID(), this.workerController, null, null);
            LOG.log(Level.FINE, "loaded worker class: " + workerClass);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.log(Level.SEVERE, String.format("failed to load the worker class %s", workerClass), e);
            throw new RuntimeException(e);
        }
    }

    private IWorkerController createWorkerController() {
        String indexEnv = System.getenv("NOMAD_ALLOC_INDEX");
        String idEnv = System.getenv("NOMAD_ALLOC_ID");
        int workerID = Integer.valueOf(indexEnv);
        this.initLogger(this.config, workerID);
        LOG.log(Level.INFO, String.format("Worker id = %s and index = %d", idEnv, workerID));
        Map<String, Integer> ports = this.getPorts(this.config);
        Map<String, String> localIps = this.getIPAddress(ports);
        String jobName = NomadContext.jobName((Config)this.config);
        String jobDescFile = JobUtils.getJobDescriptionFilePath(jobName, this.config);
        JobAPI.Job job = JobUtils.readJobFile(null, jobDescFile);
        int numberOfWorkers = job.getNumberOfWorkers();
        LOG.info("Worker Count..: " + numberOfWorkers);
        JobAPI.ComputeResource computeResource = JobUtils.getComputeResource(job, 0);
        int port = ports.get("worker");
        String host = localIps.get("worker");
        JobMasterAPI.NodeInfo nodeInfo = NomadContext.getNodeInfo((Config)this.config, (String)host);
        JobMasterAPI.WorkerInfo workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)host, (int)port, (JobMasterAPI.NodeInfo)nodeInfo, (JobAPI.ComputeResource)computeResource, ports);
        int jobMasterPort = 0;
        String jobMasterIP = null;
        if (!JobMasterContext.jobMasterRunsInClient((Config)this.config)) {
            ZKJobMasterFinder finder = new ZKJobMasterFinder(this.config);
            finder.initialize();
            String jobMasterIPandPort = finder.getJobMasterIPandPort();
            if (jobMasterIPandPort == null) {
                LOG.info("Job Master has not joined yet. Will wait and try to get the address ...");
                jobMasterIPandPort = finder.waitAndGetJobMasterIPandPort(20000L);
                LOG.info("Job Master address: " + jobMasterIPandPort);
            } else {
                LOG.info("Job Master address: " + jobMasterIPandPort);
            }
            finder.close();
            String jobMasterPortStr = jobMasterIPandPort.substring(jobMasterIPandPort.lastIndexOf(":") + 1);
            jobMasterPort = Integer.parseInt(jobMasterPortStr);
            jobMasterIP = jobMasterIPandPort.substring(0, jobMasterIPandPort.lastIndexOf(":"));
        } else {
            jobMasterIP = JobMasterContext.jobMasterIP((Config)this.config);
            jobMasterPort = JobMasterContext.jobMasterPort((Config)this.config);
        }
        this.config = JobUtils.overrideConfigs(job, this.config);
        this.config = JobUtils.updateConfigs(job, this.config);
        int workerCount = job.getNumberOfWorkers();
        LOG.info("Worker Count..: " + workerCount);
        this.masterClient = this.createMasterAgent(this.config, jobMasterIP, jobMasterPort, workerInfo, numberOfWorkers);
        return this.masterClient.getJMWorkerController();
    }

    private JMWorkerAgent createMasterAgent(Config cfg, String masterHost, int masterPort, JobMasterAPI.WorkerInfo workerInfo, int numberContainers) {
        JMWorkerAgent jobMasterAgent = JMWorkerAgent.createJMWorkerAgent((Config)cfg, (JobMasterAPI.WorkerInfo)workerInfo, (String)masterHost, (int)masterPort, (int)numberContainers);
        LOG.log(Level.INFO, String.format("Connecting to job master..: %s:%d", masterHost, masterPort));
        jobMasterAgent.startThreaded(false);
        jobMasterAgent.sendWorkerRunningMessage();
        return jobMasterAgent;
    }

    private Map<String, Integer> getPorts(Config cfg) {
        String portNamesConfig = NomadContext.networkPortNames(cfg);
        String[] portNames = portNamesConfig.split(",");
        HashMap<String, Integer> ports = new HashMap<String, Integer>();
        for (String pName : portNames) {
            String portNumber = System.getenv("NOMAD_PORT_" + pName);
            int port = Integer.valueOf(portNumber);
            ports.put(pName, port);
        }
        return ports;
    }

    private void initLogger(Config cfg, int workerID) {
        LoggingHelper.setLoggingFormat((String)"[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        LoggingHelper.setLogLevel((String)LoggingContext.loggingLevel((Config)cfg));
        String jobWorkingDirectory = NomadContext.workingDirectory(cfg);
        String jobName = NomadContext.jobName((Config)cfg);
        NomadPersistentVolume pv = new NomadPersistentVolume(this.controller.createPersistentJobDirName(jobName), workerID);
        String persistentJobDir = pv.getJobDir().getAbsolutePath();
        if (persistentJobDir == null) {
            return;
        }
        String logDir = persistentJobDir + "/logs";
        LOG.log(Level.INFO, "LOG DIR is ......: " + logDir);
        File directory = new File(logDir);
        if (!directory.exists() && !directory.mkdirs()) {
            throw new RuntimeException("Failed to create log directory: " + logDir);
        }
        LoggingHelper.setupLogging((Config)cfg, (String)logDir, (String)("worker-" + workerID));
    }

    private String getTaskDirectory() {
        return System.getenv("NOMAD_TASK_DIR");
    }

    private Map<String, String> getIPAddress(Map<String, Integer> ports) {
        HashMap<String, String> ips = new HashMap<String, String>();
        for (Map.Entry<String, Integer> e : ports.entrySet()) {
            ips.put(e.getKey(), System.getenv("NOMAD_IP_" + e.getKey()));
        }
        return ips;
    }

    public void closeWorker() {
        this.masterClient.sendWorkerCompletedMessage();
        this.masterClient.close();
    }
}

