/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.core.graph.executiongraph;

import com.google.common.collect.Sets;
import io.ray.api.BaseActorHandle;
import io.ray.api.id.ActorId;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionGraph
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private final String jobName;
    private Map<String, String> jobConfig;
    private Map<Integer, ExecutionJobVertex> executionJobVertexMap;
    private Map<Integer, ExecutionVertex> executionVertexMap;
    private Map<ActorId, ExecutionVertex> actorIdExecutionVertexMap;
    private Map<String, Set<BaseActorHandle>> channelGroupedActors;
    private int maxParallelism;
    private long buildTime;
    private AtomicInteger executionVertexIdGenerator = new AtomicInteger(0);

    public ExecutionGraph(String jobName) {
        this.jobName = jobName;
        this.buildTime = System.currentTimeMillis();
    }

    public String getJobName() {
        return this.jobName;
    }

    public List<ExecutionJobVertex> getExecutionJobVertexList() {
        return new ArrayList<ExecutionJobVertex>(this.executionJobVertexMap.values());
    }

    public Map<Integer, ExecutionJobVertex> getExecutionJobVertexMap() {
        return this.executionJobVertexMap;
    }

    public void setExecutionJobVertexMap(Map<Integer, ExecutionJobVertex> executionJobVertexMap) {
        this.executionJobVertexMap = executionJobVertexMap;
    }

    public void generateActorMappings() {
        LOG.info("Setup queue actors relation.");
        this.channelGroupedActors = new HashMap<String, Set<BaseActorHandle>>();
        this.actorIdExecutionVertexMap = new HashMap<ActorId, ExecutionVertex>();
        this.getAllExecutionVertices().forEach(curVertex -> {
            this.actorIdExecutionVertexMap.put(curVertex.getActorId(), (ExecutionVertex)curVertex);
            List<ExecutionEdge> inputEdges = curVertex.getInputEdges();
            inputEdges.forEach(inputEdge -> {
                ExecutionVertex inputVertex = inputEdge.getSourceExecutionVertex();
                String channelId = curVertex.getChannelIdByPeerVertex(inputVertex);
                this.addActorToChannelGroupedActors(this.channelGroupedActors, channelId, inputVertex.getWorkerActor());
            });
            List<ExecutionEdge> outputEdges = curVertex.getOutputEdges();
            outputEdges.forEach(outputEdge -> {
                ExecutionVertex outputVertex = outputEdge.getTargetExecutionVertex();
                String channelId = curVertex.getChannelIdByPeerVertex(outputVertex);
                this.addActorToChannelGroupedActors(this.channelGroupedActors, channelId, outputVertex.getWorkerActor());
            });
        });
        LOG.debug("Channel grouped actors is: {}.", this.channelGroupedActors);
    }

    private void addActorToChannelGroupedActors(Map<String, Set<BaseActorHandle>> channelGroupedActors, String queueName, BaseActorHandle actor) {
        Set actorSet = channelGroupedActors.computeIfAbsent(queueName, k -> new HashSet());
        actorSet.add(actor);
    }

    public void setExecutionVertexMap(Map<Integer, ExecutionVertex> executionVertexMap) {
        this.executionVertexMap = executionVertexMap;
    }

    public Map<String, String> getJobConfig() {
        return this.jobConfig;
    }

    public void setJobConfig(Map<String, String> jobConfig) {
        this.jobConfig = jobConfig;
    }

    public int getMaxParallelism() {
        return this.maxParallelism;
    }

    public void setMaxParallelism(int maxParallelism) {
        this.maxParallelism = maxParallelism;
    }

    public long getBuildTime() {
        return this.buildTime;
    }

    public int generateExecutionVertexId() {
        return this.executionVertexIdGenerator.getAndIncrement();
    }

    public AtomicInteger getExecutionVertexIdGenerator() {
        return this.executionVertexIdGenerator;
    }

    public List<ExecutionVertex> getAllExecutionVertices() {
        return this.executionJobVertexMap.values().stream().map(ExecutionJobVertex::getExecutionVertices).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public List<ExecutionVertex> getAllAddedExecutionVertices() {
        return this.executionJobVertexMap.values().stream().map(ExecutionJobVertex::getExecutionVertices).flatMap(Collection::stream).filter(ExecutionVertex::is2Add).collect(Collectors.toList());
    }

    public ExecutionVertex getExecutionVertexByExecutionVertexId(int executionVertexId) {
        if (this.executionVertexMap.containsKey(executionVertexId)) {
            return this.executionVertexMap.get(executionVertexId);
        }
        throw new RuntimeException("Vertex " + executionVertexId + " does not exist!");
    }

    public ExecutionVertex getExecutionVertexByActorId(ActorId actorId) {
        return this.actorIdExecutionVertexMap.get(actorId);
    }

    public Optional<BaseActorHandle> getActorById(ActorId actorId) {
        return this.getAllActors().stream().filter(actor -> actor.getId().equals((Object)actorId)).findFirst();
    }

    public BaseActorHandle getPeerActor(BaseActorHandle actor, String channelName) {
        Set<BaseActorHandle> set = this.getActorsByChannelId(channelName);
        BaseActorHandle[] res = new BaseActorHandle[1];
        set.forEach(anActor -> {
            if (!anActor.equals(actor)) {
                res[0] = anActor;
            }
        });
        return res[0];
    }

    public Set<BaseActorHandle> getActorsByChannelId(String channelId) {
        return this.channelGroupedActors.getOrDefault(channelId, Sets.newHashSet());
    }

    public List<BaseActorHandle> getAllActors() {
        return this.getActorsFromJobVertices(this.getExecutionJobVertexList());
    }

    public List<BaseActorHandle> getSourceActors() {
        List<ExecutionJobVertex> executionJobVertices = this.getExecutionJobVertexList().stream().filter(ExecutionJobVertex::isSourceVertex).collect(Collectors.toList());
        return this.getActorsFromJobVertices(executionJobVertices);
    }

    public List<BaseActorHandle> getNonSourceActors() {
        List<ExecutionJobVertex> executionJobVertices = this.getExecutionJobVertexList().stream().filter(executionJobVertex -> executionJobVertex.isTransformationVertex() || executionJobVertex.isSinkVertex()).collect(Collectors.toList());
        return this.getActorsFromJobVertices(executionJobVertices);
    }

    public List<BaseActorHandle> getSinkActors() {
        List<ExecutionJobVertex> executionJobVertices = this.getExecutionJobVertexList().stream().filter(ExecutionJobVertex::isSinkVertex).collect(Collectors.toList());
        return this.getActorsFromJobVertices(executionJobVertices);
    }

    public List<BaseActorHandle> getActorsFromJobVertices(List<ExecutionJobVertex> executionJobVertices) {
        return executionJobVertices.stream().map(ExecutionJobVertex::getExecutionVertices).flatMap(Collection::stream).map(ExecutionVertex::getWorkerActor).collect(Collectors.toList());
    }

    public Set<String> getActorName(Set<ActorId> actorIds) {
        return this.getAllExecutionVertices().stream().filter(executionVertex -> actorIds.contains(executionVertex.getActorId())).map(ExecutionVertex::getActorName).collect(Collectors.toSet());
    }

    public String getActorName(ActorId actorId) {
        HashSet set = Sets.newHashSet();
        set.add(actorId);
        Set<String> result = this.getActorName(set);
        if (result.isEmpty()) {
            return null;
        }
        return result.iterator().next();
    }

    public List<ActorId> getAllActorsId() {
        return this.getAllActors().stream().map(BaseActorHandle::getId).collect(Collectors.toList());
    }
}

