/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.resourcemanager;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.ray.api.Ray;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.streaming.runtime.config.StreamingMasterConfig;
import io.ray.streaming.runtime.config.master.ResourceConfig;
import io.ray.streaming.runtime.config.types.ResourceAssignStrategyType;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.core.resource.Resources;
import io.ray.streaming.runtime.master.JobRuntimeContext;
import io.ray.streaming.runtime.master.resourcemanager.ResourceAssignmentView;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy;
import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategyFactory;
import io.ray.streaming.runtime.util.RayUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceManagerImpl
implements ResourceManager {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerImpl.class);
    private static final String CONTAINER_ENGAGED_KEY = "CONTAINER_ENGAGED_KEY";
    private JobRuntimeContext runtimeContext;
    private ResourceConfig resourceConfig;
    private ResourceAssignStrategy resourceAssignStrategy;
    private final Resources resources;
    private int actorNumPerContainer;
    private final ScheduledExecutorService resourceUpdater = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("resource-update-thread").build());

    public ResourceManagerImpl(JobRuntimeContext runtimeContext) {
        this.runtimeContext = runtimeContext;
        StreamingMasterConfig masterConfig = runtimeContext.getConf().masterConfig;
        this.resourceConfig = masterConfig.resourceConfig;
        this.resources = new Resources();
        LOG.info("ResourceManagerImpl begin init, conf is {}, resources are {}.", (Object)this.resourceConfig, (Object)this.resources);
        this.actorNumPerContainer = this.resourceConfig.actorNumPerContainer();
        ResourceAssignStrategyType resourceAssignStrategyType = ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY;
        this.resourceAssignStrategy = ResourceAssignStrategyFactory.getStrategy(resourceAssignStrategyType);
        LOG.info("Slot assign strategy: {}.", (Object)this.resourceAssignStrategy.getName());
        this.initResource();
        this.checkAndUpdateResourcePeriodically();
        LOG.info("ResourceManagerImpl init success.");
    }

    @Override
    public ResourceAssignmentView assignResource(List<Container> containers, ExecutionGraph executionGraph) {
        return this.resourceAssignStrategy.assignResource(containers, executionGraph);
    }

    @Override
    public String getName() {
        return this.resourceAssignStrategy.getName();
    }

    @Override
    public ImmutableList<Container> getRegisteredContainers() {
        LOG.info("Current resource detail: {}.", (Object)this.resources.toString());
        return this.resources.getRegisteredContainers();
    }

    private void checkAndUpdateResource() {
        Map<UniqueId, NodeInfo> latestNodeInfos = RayUtils.getAliveNodeInfoMap();
        List addNodes = latestNodeInfos.keySet().stream().filter(this::isAddedNode).collect(Collectors.toList());
        List<UniqueId> deleteNodes = this.resources.getRegisteredContainerMap().keySet().stream().filter(nodeId -> !latestNodeInfos.containsKey(nodeId)).collect(Collectors.toList());
        LOG.info("Latest node infos: {}, current containers: {}, add nodes: {}, delete nodes: {}.", new Object[]{latestNodeInfos, this.resources.getRegisteredContainers(), addNodes, deleteNodes});
        if (!addNodes.isEmpty() || !deleteNodes.isEmpty()) {
            LOG.info("Latest node infos from GCS: {}", latestNodeInfos);
            LOG.info("Resource details: {}.", (Object)this.resources.toString());
            LOG.info("Get add nodes info: {}, del nodes info: {}.", addNodes, deleteNodes);
            this.unregisterDeletedContainer(deleteNodes);
            this.registerNewContainers(addNodes.stream().map(latestNodeInfos::get).collect(Collectors.toList()));
        }
    }

    private void registerNewContainers(List<NodeInfo> nodeInfos) {
        LOG.info("Start to register containers. new add node infos are: {}.", nodeInfos);
        if (nodeInfos == null || nodeInfos.isEmpty()) {
            LOG.info("NodeInfos is null or empty, skip registry.");
            return;
        }
        for (NodeInfo nodeInfo : nodeInfos) {
            this.registerContainer(nodeInfo);
        }
    }

    private void registerContainer(NodeInfo nodeInfo) {
        LOG.info("Register container {}.", (Object)nodeInfo);
        Container container = Container.from(nodeInfo);
        double availableCapacity = this.actorNumPerContainer - container.getAllocatedActorNum();
        Ray.setResource((UniqueId)container.getNodeId(), (String)container.getName(), (double)availableCapacity);
        Ray.setResource((UniqueId)container.getNodeId(), (String)CONTAINER_ENGAGED_KEY, (double)1.0);
        container.getAvailableResources().put(container.getName(), availableCapacity);
        this.resources.registerContainer(container);
    }

    private void unregisterDeletedContainer(List<UniqueId> deletedIds) {
        LOG.info("Unregister container, deleted node ids are: {}.", deletedIds);
        if (null == deletedIds || deletedIds.isEmpty()) {
            return;
        }
        this.resources.unRegisterContainer(deletedIds);
    }

    private void initResource() {
        LOG.info("Init resource.");
        this.checkAndUpdateResource();
    }

    private void checkAndUpdateResourcePeriodically() {
        long intervalSecond = this.resourceConfig.resourceCheckIntervalSecond();
        this.resourceUpdater.scheduleAtFixedRate(Ray.wrapRunnable(this::checkAndUpdateResource), 0L, intervalSecond, TimeUnit.SECONDS);
    }

    private boolean isAddedNode(UniqueId uniqueId) {
        return !this.resources.getRegisteredContainerMap().containsKey((Object)uniqueId);
    }
}

