/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.resourcemanager.strategy.impl;

import io.ray.streaming.runtime.config.types.ResourceAssignStrategyType;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.core.resource.ResourceType;
import io.ray.streaming.runtime.master.resourcemanager.ResourceAssignmentView;
import io.ray.streaming.runtime.master.resourcemanager.ViewBuilder;
import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy;
import io.ray.streaming.runtime.master.scheduler.ScheduleException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineFirstStrategy
implements ResourceAssignStrategy {
    public static final Logger LOG = LoggerFactory.getLogger(PipelineFirstStrategy.class);
    private int currentContainerIndex = 0;

    @Override
    public ResourceAssignmentView assignResource(List<Container> containers, ExecutionGraph executionGraph) {
        Map<Integer, ExecutionJobVertex> vertices = executionGraph.getExecutionJobVertexMap();
        HashMap vertexRemainingNum = new HashMap();
        vertices.forEach((k, v) -> {
            int size = v.getExecutionVertices().size();
            vertexRemainingNum.put(k, size);
        });
        int totalExecutionVerticesNum = vertexRemainingNum.values().stream().mapToInt(Integer::intValue).sum();
        int containerNum = containers.size();
        int capacityPerContainer = Math.max(totalExecutionVerticesNum / containerNum, 1);
        this.updateContainerCapacity(containers, capacityPerContainer);
        int enlargeCapacityThreshold = 0;
        boolean enlarged = false;
        if (capacityPerContainer * containerNum < totalExecutionVerticesNum) {
            enlargeCapacityThreshold = capacityPerContainer * containerNum;
            LOG.info("Need to enlarge capacity per container, threshold: {}.", (Object)enlargeCapacityThreshold);
        }
        LOG.info("Total execution vertices num: {}, container num: {}, capacity per container: {}.", new Object[]{totalExecutionVerticesNum, containerNum, capacityPerContainer});
        int maxParallelism = executionGraph.getMaxParallelism();
        int allocatedVertexCount = 0;
        for (int i = 0; i < maxParallelism; ++i) {
            for (ExecutionJobVertex jobVertex : vertices.values()) {
                List<ExecutionVertex> exeVertices = jobVertex.getExecutionVertices();
                if (exeVertices.size() <= i) continue;
                ExecutionVertex executionVertex = exeVertices.get(i);
                Map<String, Double> requiredResource = executionVertex.getResource();
                if (requiredResource.containsKey(ResourceType.CPU.getValue())) {
                    LOG.info("Required resource contain {} value : {}, no limitation by default.", (Object)ResourceType.CPU, (Object)requiredResource.get(ResourceType.CPU.getValue()));
                    requiredResource.remove(ResourceType.CPU.getValue());
                }
                Container targetContainer = this.findMatchedContainer(requiredResource, containers);
                targetContainer.allocateActor(executionVertex);
                if (enlarged || enlargeCapacityThreshold <= 0 || ++allocatedVertexCount < enlargeCapacityThreshold) continue;
                this.updateContainerCapacity(containers, capacityPerContainer + 1);
                enlarged = true;
                LOG.info("Enlarge capacity per container to: {}.", (Object)containers.get(0).getCapacity());
            }
        }
        ResourceAssignmentView allocatingView = ViewBuilder.buildResourceAssignmentView(containers);
        LOG.info("Assigning resource finished, allocating map: {}.", (Object)allocatingView);
        return allocatingView;
    }

    @Override
    public String getName() {
        return ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY.getName();
    }

    private void updateContainerCapacity(List<Container> containers, int capacity) {
        containers.forEach(c -> c.updateCapacity(capacity));
    }

    private Container findMatchedContainer(Map<String, Double> requiredResource, List<Container> containers) {
        LOG.info("Check resource, required: {}.", requiredResource);
        int checkedNum = 0;
        while (!this.hasEnoughResource(requiredResource, this.getCurrentContainer(containers))) {
            this.forwardToNextContainer(containers);
            if (++checkedNum < containers.size()) continue;
            throw new ScheduleException(String.format("No enough resource left, required resource: %s, available resource: %s.", requiredResource, containers));
        }
        return this.getCurrentContainer(containers);
    }

    private boolean hasEnoughResource(Map<String, Double> requiredResource, Container container) {
        LOG.info("Check resource for index: {}, container: {}", (Object)this.currentContainerIndex, (Object)container);
        if (null == requiredResource) {
            return true;
        }
        if (container.isFull()) {
            LOG.info("Container {} is full.", (Object)container);
            return false;
        }
        Map<String, Double> availableResource = container.getAvailableResources();
        for (Map.Entry<String, Double> entry : requiredResource.entrySet()) {
            if (availableResource.containsKey(entry.getKey())) {
                if (!(availableResource.get(entry.getKey()) < entry.getValue())) continue;
                LOG.warn("No enough resource for container {}. required: {}, available: {}.", new Object[]{container.getAddress(), requiredResource, availableResource});
                return false;
            }
            LOG.warn("No enough resource for container {}. required: {}, available: {}.", new Object[]{container.getAddress(), requiredResource, availableResource});
            return false;
        }
        return true;
    }

    private Container forwardToNextContainer(List<Container> containers) {
        this.currentContainerIndex = (this.currentContainerIndex + 1) % containers.size();
        return this.getCurrentContainer(containers);
    }

    private Container getCurrentContainer(List<Container> containers) {
        return containers.get(this.currentContainerIndex);
    }
}

