/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master;

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.api.id.ActorId;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.config.StreamingMasterConfig;
import io.ray.streaming.runtime.context.ContextBackend;
import io.ray.streaming.runtime.context.ContextBackendFactory;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.master.context.JobMasterRuntimeContext;
import io.ray.streaming.runtime.master.coordinator.CheckpointCoordinator;
import io.ray.streaming.runtime.master.coordinator.FailoverCoordinator;
import io.ray.streaming.runtime.master.coordinator.command.WorkerCommitReport;
import io.ray.streaming.runtime.master.coordinator.command.WorkerRollbackRequest;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManagerImpl;
import io.ray.streaming.runtime.master.scheduler.JobSchedulerImpl;
import io.ray.streaming.runtime.util.CheckpointStateUtil;
import io.ray.streaming.runtime.util.ResourceUtil;
import io.ray.streaming.runtime.util.Serializer;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobMaster {
    private static final Logger LOG = LoggerFactory.getLogger(JobMaster.class);
    private JobMasterRuntimeContext runtimeContext;
    private ResourceManager resourceManager;
    private JobSchedulerImpl scheduler;
    private GraphManager graphManager;
    private StreamingMasterConfig conf;
    private ContextBackend contextBackend;
    private ActorHandle<JobMaster> jobMasterActor;
    private CheckpointCoordinator checkpointCoordinator;
    private FailoverCoordinator failoverCoordinator;

    public JobMaster(Map<String, String> confMap) {
        LOG.info("Creating job master with conf: {}.", confMap);
        StreamingConfig streamingConfig = new StreamingConfig(confMap);
        this.conf = streamingConfig.masterConfig;
        this.contextBackend = ContextBackendFactory.getContextBackend(this.conf);
        this.runtimeContext = new JobMasterRuntimeContext(streamingConfig);
        if (!Ray.getRuntimeContext().isSingleProcess() && Ray.getRuntimeContext().wasCurrentActorRestarted()) {
            this.loadMasterCheckpoint();
        }
        LOG.info("Finished creating job master.");
    }

    public static String getJobMasterRuntimeContextKey(StreamingMasterConfig conf) {
        return conf.checkpointConfig.jobMasterContextCpPrefixKey() + conf.commonConfig.jobName();
    }

    private void loadMasterCheckpoint() {
        LOG.info("Start to load JobMaster's checkpoint.");
        byte[] bytes = CheckpointStateUtil.get(this.contextBackend, JobMaster.getJobMasterRuntimeContextKey(this.getConf()));
        if (bytes == null) {
            LOG.warn("JobMaster got empty checkpoint from state backend. Skip loading checkpoint.");
            this.runtimeContext.checkpointIds.add(0L);
            return;
        }
        this.runtimeContext = (JobMasterRuntimeContext)Serializer.decode(bytes);
        LOG.info("JobMaster recover runtime context[{}] from state backend.", (Object)this.runtimeContext);
        this.init(true);
    }

    public Boolean init(boolean isRecover) {
        LOG.info("Initializing job master, isRecover={}.", (Object)isRecover);
        if (this.runtimeContext.getExecutionGraph() == null) {
            LOG.error("Init job master failed. Job graphs is null.");
            return false;
        }
        ExecutionGraph executionGraph = this.graphManager.getExecutionGraph();
        Preconditions.checkArgument((executionGraph != null ? 1 : 0) != 0, (Object)"no execution graph");
        this.checkpointCoordinator = new CheckpointCoordinator(this);
        this.checkpointCoordinator.start();
        this.failoverCoordinator = new FailoverCoordinator(this, isRecover);
        this.failoverCoordinator.start();
        this.saveContext();
        LOG.info("Finished initializing job master.");
        return true;
    }

    public boolean submitJob(ActorHandle<JobMaster> jobMasterActor, JobGraph jobGraph) {
        LOG.info("Begin submitting job using logical plan: {}.", (Object)jobGraph);
        this.jobMasterActor = jobMasterActor;
        this.graphManager = new GraphManagerImpl(this.runtimeContext);
        this.resourceManager = new ResourceManagerImpl(this.runtimeContext);
        ExecutionGraph executionGraph = this.graphManager.buildExecutionGraph(jobGraph);
        this.runtimeContext.setJobGraph(jobGraph);
        this.runtimeContext.setExecutionGraph(executionGraph);
        try {
            this.scheduler = new JobSchedulerImpl(this);
            this.scheduler.scheduleJob(this.graphManager.getExecutionGraph());
        }
        catch (Exception e) {
            e.printStackTrace();
            LOG.error("Failed to submit job {}.", (Object)e, (Object)e);
            return false;
        }
        return true;
    }

    public synchronized void saveContext() {
        if (this.runtimeContext != null && this.getConf() != null) {
            LOG.debug("Save JobMaster context.");
            byte[] contextBytes = Serializer.encode(this.runtimeContext);
            CheckpointStateUtil.put(this.contextBackend, JobMaster.getJobMasterRuntimeContextKey(this.getConf()), contextBytes);
        }
    }

    public byte[] reportJobWorkerCommit(byte[] reportBytes) {
        Boolean ret = false;
        try {
            RemoteCall.BaseWorkerCmd reportPb = RemoteCall.BaseWorkerCmd.parseFrom(reportBytes);
            ActorId actorId = ActorId.fromBytes((byte[])reportPb.getActorId().toByteArray());
            long remoteCallCost = System.currentTimeMillis() - reportPb.getTimestamp();
            LOG.info("Vertex {}, request job worker commit cost {}ms, actorId={}.", new Object[]{this.getExecutionVertex(actorId), remoteCallCost, actorId});
            RemoteCall.WorkerCommitReport commit = (RemoteCall.WorkerCommitReport)reportPb.getDetail().unpack(RemoteCall.WorkerCommitReport.class);
            WorkerCommitReport report = new WorkerCommitReport(actorId, commit.getCommitCheckpointId());
            ret = this.checkpointCoordinator.reportJobWorkerCommit(report);
        }
        catch (InvalidProtocolBufferException e) {
            LOG.error("Parse job worker commit has exception.", (Throwable)e);
        }
        return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray();
    }

    public byte[] requestJobWorkerRollback(byte[] requestBytes) {
        Boolean ret = false;
        try {
            RemoteCall.BaseWorkerCmd requestPb = RemoteCall.BaseWorkerCmd.parseFrom(requestBytes);
            ActorId actorId = ActorId.fromBytes((byte[])requestPb.getActorId().toByteArray());
            long remoteCallCost = System.currentTimeMillis() - requestPb.getTimestamp();
            ExecutionGraph executionGraph = this.graphManager.getExecutionGraph();
            Optional<BaseActorHandle> rayActor = executionGraph.getActorById(actorId);
            if (!rayActor.isPresent()) {
                LOG.warn("Skip this invalid rollback, actor id {} is not found.", (Object)actorId);
                return RemoteCall.BoolResult.newBuilder().setBoolRes(false).build().toByteArray();
            }
            ExecutionVertex exeVertex = this.getExecutionVertex(actorId);
            LOG.info("Vertex {}, request job worker rollback cost {}ms, actorId={}.", new Object[]{exeVertex, remoteCallCost, actorId});
            RemoteCall.WorkerRollbackRequest rollbackPb = RemoteCall.WorkerRollbackRequest.parseFrom(requestPb.getDetail().getValue());
            exeVertex.setPid(rollbackPb.getWorkerPid());
            String hostname = "";
            Optional<Container> container = ResourceUtil.getContainerById(this.resourceManager.getRegisteredContainers(), exeVertex.getContainerId());
            if (container.isPresent()) {
                hostname = container.get().getHostname();
            }
            WorkerRollbackRequest request = new WorkerRollbackRequest(actorId, rollbackPb.getExceptionMsg(), hostname, exeVertex.getPid());
            ret = this.failoverCoordinator.requestJobWorkerRollback(request);
            LOG.info("Vertex {} request rollback, exception msg : {}.", (Object)exeVertex, (Object)rollbackPb.getExceptionMsg());
        }
        catch (Throwable e) {
            LOG.error("Parse job worker rollback has exception.", e);
        }
        return RemoteCall.BoolResult.newBuilder().setBoolRes(ret).build().toByteArray();
    }

    private ExecutionVertex getExecutionVertex(ActorId id) {
        return this.graphManager.getExecutionGraph().getExecutionVertexByActorId(id);
    }

    public ActorHandle<JobMaster> getJobMasterActor() {
        return this.jobMasterActor;
    }

    public JobMasterRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public ResourceManager getResourceManager() {
        return this.resourceManager;
    }

    public GraphManager getGraphManager() {
        return this.graphManager;
    }

    public StreamingMasterConfig getConf() {
        return this.conf;
    }
}

