/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.ray.api.ActorHandle;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ViewBuilder;
import io.ray.streaming.runtime.master.scheduler.JobScheduler;
import io.ray.streaming.runtime.master.scheduler.controller.WorkerLifecycleController;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobSchedulerImpl
implements JobScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
    private final JobMaster jobMaster;
    private final ResourceManager resourceManager;
    private final GraphManager graphManager;
    private final WorkerLifecycleController workerLifecycleController;
    private StreamingConfig jobConfig;

    public JobSchedulerImpl(JobMaster jobMaster) {
        this.jobMaster = jobMaster;
        this.graphManager = jobMaster.getGraphManager();
        this.resourceManager = jobMaster.getResourceManager();
        this.workerLifecycleController = new WorkerLifecycleController();
        this.jobConfig = jobMaster.getRuntimeContext().getConf();
        LOG.info("Scheduler initiated.");
    }

    @Override
    public boolean scheduleJob(ExecutionGraph executionGraph) {
        LOG.info("Begin scheduling. Job: {}.", (Object)executionGraph.getJobName());
        this.prepareResourceAndCreateWorker(executionGraph);
        executionGraph.generateActorMappings();
        this.initAndStart(executionGraph);
        return true;
    }

    protected void prepareResourceAndCreateWorker(ExecutionGraph executionGraph) {
        ImmutableList<Container> containers = this.resourceManager.getRegisteredContainers();
        this.resourceManager.assignResource((List<Container>)containers, executionGraph);
        LOG.info("Allocating map is: {}.", (Object)ViewBuilder.buildResourceAssignmentView(containers));
        this.createWorkers(executionGraph);
    }

    private void initAndStart(ExecutionGraph executionGraph) {
        Map<ExecutionVertex, JobWorkerContext> vertexToContextMap = this.buildWorkersContext(executionGraph);
        Preconditions.checkState((boolean)this.initWorkers(vertexToContextMap));
        this.initMaster();
        this.startWorkers(executionGraph, this.jobMaster.getRuntimeContext().lastCheckpointId);
    }

    public boolean createWorkers(ExecutionGraph executionGraph) {
        LOG.info("Begin creating workers.");
        long startTs = System.currentTimeMillis();
        boolean createResult = this.workerLifecycleController.createWorkers(executionGraph.getAllAddedExecutionVertices());
        if (createResult) {
            LOG.info("Finished creating workers. Cost {} ms.", (Object)(System.currentTimeMillis() - startTs));
            return true;
        }
        LOG.error("Failed to create workers. Cost {} ms.", (Object)(System.currentTimeMillis() - startTs));
        return false;
    }

    protected boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> vertexToContextMap) {
        int timeoutMs = this.jobConfig.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs();
        boolean succeed = this.workerLifecycleController.initWorkers(vertexToContextMap, timeoutMs);
        if (!succeed) {
            LOG.error("Failed to initiate workers in {} milliseconds", (Object)timeoutMs);
        }
        return succeed;
    }

    public boolean startWorkers(ExecutionGraph executionGraph, long checkpointId) {
        boolean result;
        try {
            result = this.workerLifecycleController.startWorkers(executionGraph, checkpointId, this.jobConfig.masterConfig.schedulerConfig.workerStartingWaitTimeoutMs());
        }
        catch (Exception e) {
            LOG.error("Failed to start workers.", (Throwable)e);
            return false;
        }
        return result;
    }

    protected Map<ExecutionVertex, JobWorkerContext> buildWorkersContext(ExecutionGraph executionGraph) {
        ActorHandle<JobMaster> masterActor = this.jobMaster.getJobMasterActor();
        HashMap<ExecutionVertex, JobWorkerContext> vertexToContextMap = new HashMap<ExecutionVertex, JobWorkerContext>();
        executionGraph.getAllExecutionVertices().forEach(vertex -> {
            JobWorkerContext context = this.buildJobWorkerContext((ExecutionVertex)vertex, masterActor);
            vertexToContextMap.put((ExecutionVertex)vertex, context);
        });
        return vertexToContextMap;
    }

    private JobWorkerContext buildJobWorkerContext(ExecutionVertex executionVertex, ActorHandle<JobMaster> masterActor) {
        JobWorkerContext context = new JobWorkerContext(masterActor, executionVertex);
        return context;
    }

    public boolean destroyWorkers(List<ExecutionVertex> executionVertices) {
        boolean result;
        try {
            result = this.workerLifecycleController.destroyWorkers(executionVertices);
        }
        catch (Exception e) {
            LOG.error("Failed to destroy workers.", (Throwable)e);
            return false;
        }
        return result;
    }

    private void initMaster() {
        this.jobMaster.init(false);
    }
}

