/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.scheduler.controller;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.api.WaitResult;
import io.ray.api.call.ActorCreator;
import io.ray.api.call.PyActorCreator;
import io.ray.api.function.PyActorClass;
import io.ray.streaming.api.Language;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.python.GraphPbBuilder;
import io.ray.streaming.runtime.rpc.RemoteCallWorker;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerLifecycleController {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerLifecycleController.class);

    public boolean createWorkers(List<ExecutionVertex> executionVertices) {
        return this.asyncBatchExecute(this::createWorker, executionVertices);
    }

    private boolean createWorker(ExecutionVertex executionVertex) {
        ActorHandle actor;
        LOG.info("Start to create worker actor for vertex: {} with resource: {}, workeConfig: {}.", new Object[]{executionVertex.getExecutionVertexName(), executionVertex.getResource(), executionVertex.getWorkerConfig()});
        Language language = executionVertex.getLanguage();
        if (Language.JAVA == language) {
            actor = ((ActorCreator)((ActorCreator)Ray.actor(JobWorker::new, (Object)executionVertex).setResources(executionVertex.getResource())).setMaxRestarts(-1)).remote();
        } else {
            RemoteCall.ExecutionVertexContext.ExecutionVertex vertexPb = new GraphPbBuilder().buildVertex(executionVertex);
            actor = ((PyActorCreator)((PyActorCreator)Ray.actor((PyActorClass)PyActorClass.of((String)"ray.streaming.runtime.worker", (String)"JobWorker"), (Object)vertexPb.toByteArray()).setResources(executionVertex.getResource())).setMaxRestarts(-1)).remote();
        }
        if (null == actor) {
            LOG.error("Create worker actor failed.");
            return false;
        }
        executionVertex.setWorkerActor((BaseActorHandle)actor);
        LOG.info("Worker actor created, actor: {}, vertex: {}.", (Object)executionVertex.getWorkerActorId(), (Object)executionVertex.getExecutionVertexName());
        return true;
    }

    public boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> vertexToContextMap, int timeout) {
        LOG.info("Begin initiating workers: {}.", vertexToContextMap);
        long startTime = System.currentTimeMillis();
        HashMap rayObjects = new HashMap();
        vertexToContextMap.entrySet().forEach(entry -> {
            ExecutionVertex vertex = (ExecutionVertex)entry.getKey();
            rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), (JobWorkerContext)entry.getValue()), vertex.getWorkerActorId());
        });
        ArrayList objectRefList = new ArrayList(rayObjects.keySet());
        LOG.info("Waiting for workers' initialization.");
        WaitResult result = Ray.wait(objectRefList, (int)objectRefList.size(), (int)timeout);
        if (result.getReady().size() != objectRefList.size()) {
            LOG.error("Initializing workers timeout[{} ms].", (Object)timeout);
            return false;
        }
        LOG.info("Finished waiting workers' initialization.");
        LOG.info("Workers initialized. Cost {} ms.", (Object)(System.currentTimeMillis() - startTime));
        return true;
    }

    public boolean startWorkers(ExecutionGraph executionGraph, long lastCheckpointId, int timeout) {
        LOG.info("Begin starting workers.");
        long startTime = System.currentTimeMillis();
        ArrayList objectRefs = new ArrayList();
        executionGraph.getSourceActors().forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId)));
        executionGraph.getNonSourceActors().forEach(actor -> objectRefs.add(RemoteCallWorker.rollback(actor, lastCheckpointId)));
        WaitResult result = Ray.wait(objectRefs, (int)objectRefs.size(), (int)timeout);
        if (result.getReady().size() != objectRefs.size()) {
            LOG.error("Starting workers timeout[{} ms].", (Object)timeout);
            return false;
        }
        LOG.info("Workers started. Cost {} ms.", (Object)(System.currentTimeMillis() - startTime));
        return true;
    }

    public boolean destroyWorkers(List<ExecutionVertex> executionVertices) {
        return this.asyncBatchExecute(this::destroyWorker, executionVertices);
    }

    private boolean destroyWorker(ExecutionVertex executionVertex) {
        BaseActorHandle rayActor = executionVertex.getWorkerActor();
        LOG.info("Begin destroying worker[vertex={}, actor={}].", (Object)executionVertex.getExecutionVertexName(), (Object)rayActor.getId());
        boolean destroyResult = RemoteCallWorker.shutdownWithoutReconstruction(rayActor);
        if (!destroyResult) {
            LOG.error("Failed to destroy JobWorker[{}]'s actor: {}.", (Object)executionVertex.getExecutionVertexName(), (Object)rayActor);
            return false;
        }
        LOG.info("Worker destroyed, actor: {}.", (Object)rayActor);
        return true;
    }

    private boolean asyncBatchExecute(Function<ExecutionVertex, Boolean> operation, List<ExecutionVertex> executionVertices) {
        Object asyncContext = Ray.getAsyncContext();
        List futureResults = executionVertices.stream().map(vertex -> CompletableFuture.supplyAsync(() -> {
            Ray.setAsyncContext((Object)asyncContext);
            return (Boolean)operation.apply((ExecutionVertex)vertex);
        })).collect(Collectors.toList());
        List succeeded = futureResults.stream().map(CompletableFuture::join).collect(Collectors.toList());
        if (succeeded.stream().anyMatch(x -> x == false)) {
            LOG.error("Not all futures return true, check ResourceManager'log the detail.");
            return false;
        }
        return true;
    }
}

