/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.graphmanager;

import io.ray.api.BaseActorHandle;
import io.ray.streaming.jobgraph.JobEdge;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.jobgraph.JobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.master.context.JobMasterRuntimeContext;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GraphManagerImpl
implements GraphManager {
    private static final Logger LOG = LoggerFactory.getLogger(GraphManagerImpl.class);
    protected final JobMasterRuntimeContext runtimeContext;

    public GraphManagerImpl(JobMasterRuntimeContext runtimeContext) {
        this.runtimeContext = runtimeContext;
    }

    @Override
    public ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {
        LOG.info("Begin build execution graph with job graph {}.", (Object)jobGraph);
        ExecutionGraph executionGraph = this.setupStructure(jobGraph);
        int maxParallelism = jobGraph.getJobVertices().stream().map(JobVertex::getParallelism).max(Integer::compareTo).get();
        executionGraph.setMaxParallelism(maxParallelism);
        executionGraph.setJobConfig(jobGraph.getJobConfig());
        LOG.info("Build execution graph success.");
        return executionGraph;
    }

    private ExecutionGraph setupStructure(JobGraph jobGraph) {
        ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobName());
        Map jobConfig = jobGraph.getJobConfig();
        LinkedHashMap<Integer, ExecutionJobVertex> exeJobVertexMap = new LinkedHashMap<Integer, ExecutionJobVertex>();
        HashMap<Integer, ExecutionVertex> executionVertexMap = new HashMap<Integer, ExecutionVertex>();
        long buildTime = executionGraph.getBuildTime();
        for (JobVertex jobVertex : jobGraph.getJobVertices()) {
            int jobVertexId = jobVertex.getVertexId();
            exeJobVertexMap.put(jobVertexId, new ExecutionJobVertex(jobVertex, jobConfig, executionGraph.getExecutionVertexIdGenerator(), buildTime));
        }
        jobGraph.getJobEdges().forEach(jobEdge -> {
            ExecutionJobVertex source = (ExecutionJobVertex)exeJobVertexMap.get(jobEdge.getSrcVertexId());
            ExecutionJobVertex target = (ExecutionJobVertex)exeJobVertexMap.get(jobEdge.getTargetVertexId());
            ExecutionJobEdge executionJobEdge = new ExecutionJobEdge(source, target, (JobEdge)jobEdge);
            source.getOutputEdges().add(executionJobEdge);
            target.getInputEdges().add(executionJobEdge);
            source.getExecutionVertices().forEach(sourceExeVertex -> target.getExecutionVertices().forEach(targetExeVertex -> {
                executionVertexMap.put(targetExeVertex.getExecutionVertexId(), (ExecutionVertex)targetExeVertex);
                executionVertexMap.put(sourceExeVertex.getExecutionVertexId(), (ExecutionVertex)sourceExeVertex);
                ExecutionEdge executionEdge = new ExecutionEdge((ExecutionVertex)sourceExeVertex, (ExecutionVertex)targetExeVertex, executionJobEdge);
                sourceExeVertex.getOutputEdges().add(executionEdge);
                targetExeVertex.getInputEdges().add(executionEdge);
            }));
        });
        executionGraph.setExecutionJobVertexMap(exeJobVertexMap);
        executionGraph.setExecutionVertexMap(executionVertexMap);
        return executionGraph;
    }

    private void addActorToChannelGroupedActors(Map<String, Set<BaseActorHandle>> channelGroupedActors, String channelId, BaseActorHandle actor) {
        Set actorSet = channelGroupedActors.computeIfAbsent(channelId, k -> new HashSet());
        actorSet.add(actor);
    }

    @Override
    public JobGraph getJobGraph() {
        return this.runtimeContext.getJobGraph();
    }

    @Override
    public ExecutionGraph getExecutionGraph() {
        return this.runtimeContext.getExecutionGraph();
    }
}

