/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.mesos;

import com.google.protobuf.ByteString;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.schedulers.mesos.MesosContext;
import edu.iu.dsc.tws.rsched.schedulers.mesos.MesosController;
import edu.iu.dsc.tws.rsched.schedulers.mesos.MesosPersistentVolume;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import org.apache.curator.shaded.com.google.common.primitives.Longs;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;

public class MesosScheduler
implements Scheduler {
    public static final Logger LOG = Logger.getLogger(MesosScheduler.class.getName());
    private final String jobID;
    private int taskIdCounter = 0;
    private Config config;
    private MesosController controller;
    private int completedTaskCounter = 0;
    private int totalTaskCount;
    private int workerCounter = 0;
    private int resourceIndex = 0;
    private int resourceInstanceCount = 0;
    private JobAPI.Job job;
    private int[] offerControl = new int[3];

    public MesosScheduler(MesosController controller, Config mconfig, JobAPI.Job myJob) {
        this.controller = controller;
        this.config = mconfig;
        this.totalTaskCount = MesosContext.numberOfContainers(this.config);
        this.job = myJob;
        this.jobID = myJob.getJobId();
    }

    public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, Protos.MasterInfo masterInfo) {
        LOG.info("Registered" + frameworkID);
    }

    public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
        LOG.info("Re-registered");
    }

    public boolean contains(String[] nodes, Protos.Offer offer) {
        for (String node : nodes) {
            if (!offer.getHostname().equals(node)) continue;
            return true;
        }
        return false;
    }

    public JobAPI.ComputeResource getResource(JobAPI.Job myJob, int rIndex) {
        JobAPI.ComputeResource computeResource = JobUtils.getComputeResource(myJob, rIndex);
        if (computeResource == null) {
            LOG.severe("Something wrong with the job object. Can not get ComputeResource from job\n++++++++++++++++++ Aborting submission ++++++++++++++++++index...:" + rIndex);
            return null;
        }
        LOG.info("get instances....:" + computeResource.getInstances());
        return computeResource;
    }

    public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) {
        int index = 0;
        this.controller.setSchedulerDriver(schedulerDriver);
        String[] desiredNodes = MesosContext.getDesiredNodes(this.config).split(",");
        if (this.taskIdCounter < this.totalTaskCount) {
            for (Protos.Offer offer : offers) {
                JobAPI.ComputeResource computeResource = this.getResource(this.job, this.resourceIndex);
                if (computeResource == null) {
                    return;
                }
                ++this.resourceInstanceCount;
                if (this.resourceInstanceCount == computeResource.getInstances() + 1) {
                    ++this.resourceIndex;
                }
                if (!MesosContext.getDesiredNodes(this.config).equals("all") && !this.contains(desiredNodes, offer)) continue;
                LOG.info("Offer comes from host ...:" + offer.getHostname());
                if (this.controller.isResourceSatisfy(offer, computeResource)) {
                    MesosPersistentVolume pv = new MesosPersistentVolume(this.controller.createPersistentJobDirName(this.jobID), this.workerCounter);
                    String persistentVolumeDir = pv.getJobDir().getAbsolutePath();
                    Protos.Offer.Operation.Launch.Builder launch = Protos.Offer.Operation.Launch.newBuilder();
                    for (int i = 0; i < MesosContext.containerPerWorker(this.config); ++i) {
                        pv.getWorkerDir();
                        Protos.TaskID taskId = this.buildNewTaskID();
                        Protos.TaskInfo.Builder taskBuilder = Protos.TaskInfo.newBuilder().setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(this.buildResource("cpus", computeResource.getCpu())).addResources(this.buildResource("mem", computeResource.getRamMegaBytes())).addResources(this.buildResource("disk", computeResource.getDiskGigaBytes() * 1000.0)).setData(ByteString.copyFromUtf8((String)("" + taskId.getValue())));
                        if (MesosContext.getUseDockerContainer(this.config).equals("true")) {
                            Protos.Parameter jobNameParam = Protos.Parameter.newBuilder().setKey("env").setValue("JOB_NAME=" + this.jobID).build();
                            Protos.Parameter workerIdParam = Protos.Parameter.newBuilder().setKey("env").setValue("WORKER_ID=" + (this.workerCounter - 1)).build();
                            ++this.workerCounter;
                            Protos.Parameter frameworkIdParam = Protos.Parameter.newBuilder().setKey("env").setValue("FRAMEWORK_ID=" + offer.getFrameworkId().getValue()).build();
                            Protos.Parameter computeResourceParam = Protos.Parameter.newBuilder().setKey("env").setValue("COMPUTE_RESOURCE_INDEX=" + this.resourceIndex).build();
                            Protos.Parameter jobIdParam = Protos.Parameter.newBuilder().setKey("env").setValue("JOB_ID=" + this.jobID).build();
                            Protos.Parameter classNameParam = null;
                            Protos.Parameter downloadMethod = Protos.Parameter.newBuilder().setKey("env").setValue("DOWNLOAD_METHOD=" + SchedulerContext.downloadMethod((Config)this.config)).build();
                            if (taskId.getValue().equals("0")) {
                                taskBuilder.setName("Job Master");
                                classNameParam = Protos.Parameter.newBuilder().setKey("env").setValue("CLASS_NAME=edu.iu.dsc.tws.rsched.schedulers.mesos.master.MesosJobMasterStarter").build();
                            } else if (SchedulerContext.useOpenMPI((Config)this.config)) {
                                if (taskId.getValue().equals("1")) {
                                    taskBuilder.setName("MPI Master " + taskId);
                                    classNameParam = Protos.Parameter.newBuilder().setKey("env").setValue("CLASS_NAME=edu.iu.dsc.tws.rsched.schedulers.mesos.mpi.MesosMPIMasterStarter").build();
                                } else {
                                    taskBuilder.setName("task " + taskId);
                                    classNameParam = Protos.Parameter.newBuilder().setKey("env").setValue("CLASS_NAME=edu.iu.dsc.tws.rsched.schedulers.mesos.mpi.MesosMPISlaveStarter").build();
                                }
                            } else {
                                taskBuilder.setName("task " + taskId);
                                classNameParam = Protos.Parameter.newBuilder().setKey("env").setValue("CLASS_NAME=edu.iu.dsc.tws.rsched.schedulers.mesos.MesosDockerWorker").build();
                            }
                            Protos.ContainerInfo.DockerInfo.Builder dockerInfoBuilder = Protos.ContainerInfo.DockerInfo.newBuilder();
                            dockerInfoBuilder.setImage(MesosContext.getDockerImageName(this.config));
                            Protos.NetworkInfo netInfo = Protos.NetworkInfo.newBuilder().setName(MesosContext.getMesosOverlayNetworkName(this.config)).build();
                            dockerInfoBuilder.setNetwork(Protos.ContainerInfo.DockerInfo.Network.USER);
                            dockerInfoBuilder.addParameters(jobNameParam);
                            dockerInfoBuilder.addParameters(workerIdParam);
                            dockerInfoBuilder.addParameters(computeResourceParam);
                            dockerInfoBuilder.addParameters(jobIdParam);
                            dockerInfoBuilder.addParameters(classNameParam);
                            dockerInfoBuilder.addParameters(frameworkIdParam);
                            dockerInfoBuilder.addParameters(downloadMethod);
                            Protos.Volume volume = Protos.Volume.newBuilder().setContainerPath("/twister2/").setHostPath(".").setMode(Protos.Volume.Mode.RW).build();
                            Protos.Volume persistentVolume = Protos.Volume.newBuilder().setContainerPath("/persistent-volume/").setHostPath(persistentVolumeDir).setMode(Protos.Volume.Mode.RW).build();
                            Protos.Volume customJarsVolume = Protos.Volume.newBuilder().setContainerPath("/customJars/").setHostPath("/root/.twister2/repository/customJars").setMode(Protos.Volume.Mode.RW).build();
                            Protos.ContainerInfo.Builder containerInfoBuilder = Protos.ContainerInfo.newBuilder();
                            containerInfoBuilder.setType(Protos.ContainerInfo.Type.DOCKER);
                            containerInfoBuilder.addVolumes(volume);
                            containerInfoBuilder.addVolumes(persistentVolume);
                            containerInfoBuilder.addVolumes(customJarsVolume);
                            containerInfoBuilder.setDocker(dockerInfoBuilder.build());
                            containerInfoBuilder.addNetworkInfos(netInfo);
                            taskBuilder.setContainer(containerInfoBuilder);
                            taskBuilder.setCommand(Protos.CommandInfo.newBuilder().setShell(false));
                        } else {
                            Protos.ExecutorInfo executorInfo = this.controller.getExecutorInfo(this.jobID, "worker-" + this.workerCounter);
                            taskBuilder.setExecutor(Protos.ExecutorInfo.newBuilder((Protos.ExecutorInfo)executorInfo));
                        }
                        launch.addTaskInfos(taskBuilder.build());
                    }
                    ArrayList<Protos.OfferID> offerIds = new ArrayList<Protos.OfferID>();
                    offerIds.add(offer.getId());
                    ArrayList<Protos.Offer.Operation> operations = new ArrayList<Protos.Offer.Operation>();
                    Protos.Offer.Operation operation = Protos.Offer.Operation.newBuilder().setType(Protos.Offer.Operation.Type.LAUNCH).setLaunch(launch).build();
                    operations.add(operation);
                    Protos.Filters filters = Protos.Filters.newBuilder().setRefuseSeconds(1.0).build();
                    schedulerDriver.acceptOffers(offerIds, operations, filters);
                    int n = index;
                    this.offerControl[n] = this.offerControl[n] + 1;
                    LOG.info("Offer from host " + offer.getHostname() + "has been accepted.");
                }
                if (this.taskIdCounter < this.totalTaskCount) continue;
                LOG.info("taskIdCounter >= totalTaskCount");
                return;
            }
        }
    }

    private Protos.TaskID buildNewTaskID() {
        return Protos.TaskID.newBuilder().setValue(Integer.toString(this.taskIdCounter++)).build();
    }

    private Protos.Resource buildResource(String name, double value) {
        return Protos.Resource.newBuilder().setName(name).setType(Protos.Value.Type.SCALAR).setScalar(this.buildScalar(value)).build();
    }

    private Protos.Resource buildRangeResource(String name, int begin, int end) {
        Protos.Value.Range range = Protos.Value.Range.newBuilder().setBegin((long)begin).setEnd((long)end).build();
        Protos.Value.Ranges ranges = Protos.Value.Ranges.newBuilder().addRange(range).build();
        return Protos.Resource.newBuilder().setName(name).setType(Protos.Value.Type.RANGES).setRanges(ranges).build();
    }

    private Protos.Value.Scalar.Builder buildScalar(double value) {
        return Protos.Value.Scalar.newBuilder().setValue(value);
    }

    public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
        LOG.warning("This offer's been rescinded. Tough luck, cowboy.");
    }

    public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
        LOG.info("Status update: " + taskStatus.getState() + " from " + taskStatus.getTaskId().getValue());
        if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) {
            ++this.completedTaskCounter;
            LOG.info("Number of completed tasks: " + this.completedTaskCounter + "/" + this.totalTaskCount);
        } else if (taskStatus.getState() == Protos.TaskState.TASK_FAILED || taskStatus.getState() == Protos.TaskState.TASK_LOST || taskStatus.getState() == Protos.TaskState.TASK_KILLED) {
            LOG.severe("Aborting because task " + taskStatus.getTaskId().getValue() + " is in unexpected state " + taskStatus.getState().getValueDescriptor().getName() + " with reason '" + taskStatus.getReason().getValueDescriptor().getName() + "' from source '" + taskStatus.getSource().getValueDescriptor().getName() + "' with message '" + taskStatus.getMessage() + "'");
        }
        if (this.totalTaskCount == this.completedTaskCounter) {
            LOG.info("All tasks are finished. Stopping driver");
            schedulerDriver.stop();
        }
    }

    public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) {
        LOG.info("Executor id:" + executorID.getValue() + " Time: " + Longs.fromByteArray((byte[])bytes));
    }

    public void disconnected(SchedulerDriver schedulerDriver) {
        LOG.info("We got disconnected ");
    }

    public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
        LOG.severe("Lost slave: " + slaveID);
    }

    public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, int i) {
        LOG.severe("Lost executor on slave " + slaveID);
    }

    public void error(SchedulerDriver schedulerDriver, String s) {
        LOG.severe("We've got errors : " + s);
    }
}

