package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.class */
public class DeclarativeSlotManager implements SlotManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DeclarativeSlotManager.class);
    private final SlotTracker slotTracker;
    private final ResourceTracker resourceTracker;
    private final BiFunction<Executor, ResourceActions, TaskExecutorManager> taskExecutorManagerFactory;

    @Nullable
    private TaskExecutorManager taskExecutorManager;
    private final Time taskManagerRequestTimeout;
    private final SlotMatchingStrategy slotMatchingStrategy;
    private final SlotManagerMetricGroup slotManagerMetricGroup;
    private final Map<SlotID, AllocationID> pendingSlotAllocations;

    @Nullable
    private ResourceManagerId resourceManagerId;

    @Nullable
    private Executor mainThreadExecutor;

    @Nullable
    private ResourceActions resourceActions;
    private boolean started;
    private final Map<JobID, String> jobMasterTargetAddresses = new HashMap();
    private boolean sendNotEnoughResourceNotifications = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager$MatchingResult.class */
    public static class MatchingResult {
        private final boolean isSuccessfulMatching;
        private final ResourceCounter newAvailableResources;

        private MatchingResult(boolean z, ResourceCounter resourceCounter) {
            this.isSuccessfulMatching = z;
            this.newAvailableResources = (ResourceCounter) Preconditions.checkNotNull(resourceCounter);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceCounter getNewAvailableResources() {
            return this.newAvailableResources;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isSuccessfulMatching() {
            return this.isSuccessfulMatching;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager$WorkerAllocationResult.class */
    public static class WorkerAllocationResult {
        private final boolean isSuccessfulAllocating;
        private final ResourceCounter newAvailableResources;

        private WorkerAllocationResult(boolean z, ResourceCounter resourceCounter) {
            this.isSuccessfulAllocating = z;
            this.newAvailableResources = (ResourceCounter) Preconditions.checkNotNull(resourceCounter);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceCounter getNewAvailableResources() {
            return this.newAvailableResources;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isSuccessfulAllocating() {
            return this.isSuccessfulAllocating;
        }
    }

    public DeclarativeSlotManager(ScheduledExecutor scheduledExecutor, SlotManagerConfiguration slotManagerConfiguration, SlotManagerMetricGroup slotManagerMetricGroup, ResourceTracker resourceTracker, SlotTracker slotTracker) {
        Preconditions.checkNotNull(slotManagerConfiguration);
        this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();
        this.slotManagerMetricGroup = (SlotManagerMetricGroup) Preconditions.checkNotNull(slotManagerMetricGroup);
        this.resourceTracker = (ResourceTracker) Preconditions.checkNotNull(resourceTracker);
        this.pendingSlotAllocations = new HashMap(16);
        this.slotTracker = (SlotTracker) Preconditions.checkNotNull(slotTracker);
        slotTracker.registerSlotStatusUpdateListener(createSlotStatusUpdateListener());
        this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
        this.taskExecutorManagerFactory = (executor, resourceActions) -> {
            return new TaskExecutorManager(slotManagerConfiguration.getDefaultWorkerResourceSpec(), slotManagerConfiguration.getNumSlotsPerWorker(), slotManagerConfiguration.getMaxSlotNum(), slotManagerConfiguration.isWaitResultConsumedBeforeRelease(), slotManagerConfiguration.getRedundantTaskManagerNum(), slotManagerConfiguration.getTaskManagerTimeout(), scheduledExecutor, executor, resourceActions);
        };
        this.resourceManagerId = null;
        this.resourceActions = null;
        this.mainThreadExecutor = null;
        this.taskExecutorManager = null;
        this.started = false;
    }

    private SlotStatusUpdateListener createSlotStatusUpdateListener() {
        return (taskManagerSlotInformation, slotState, slotState2, jobID) -> {
            if (slotState == SlotState.PENDING) {
                this.pendingSlotAllocations.remove(taskManagerSlotInformation.getSlotId());
            }
            if (slotState2 == SlotState.PENDING) {
                this.resourceTracker.notifyAcquiredResource(jobID, taskManagerSlotInformation.getResourceProfile());
            }
            if (slotState2 == SlotState.FREE) {
                this.resourceTracker.notifyLostResource(jobID, taskManagerSlotInformation.getResourceProfile());
            }
            if (slotState2 == SlotState.ALLOCATED) {
                this.taskExecutorManager.occupySlot(taskManagerSlotInformation.getInstanceId());
            }
            if (slotState == SlotState.ALLOCATED && slotState2 == SlotState.FREE) {
                this.taskExecutorManager.freeSlot(taskManagerSlotInformation.getInstanceId());
            }
        };
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public void setFailUnfulfillableRequest(boolean z) {
        this.sendNotEnoughResourceNotifications = z;
        if (z) {
            checkResourceRequirements();
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public void start(ResourceManagerId resourceManagerId, Executor executor, ResourceActions resourceActions) {
        LOG.debug("Starting the slot manager.");
        this.resourceManagerId = (ResourceManagerId) Preconditions.checkNotNull(resourceManagerId);
        this.mainThreadExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.resourceActions = (ResourceActions) Preconditions.checkNotNull(resourceActions);
        this.taskExecutorManager = this.taskExecutorManagerFactory.apply(executor, resourceActions);
        this.started = true;
        registerSlotManagerMetrics();
    }

    private void registerSlotManagerMetrics() {
        this.slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_AVAILABLE, (String) () -> {
            return Long.valueOf(getNumberFreeSlots());
        });
        this.slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_TOTAL, (String) () -> {
            return Long.valueOf(getNumberRegisteredSlots());
        });
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public void suspend() {
        if (this.started) {
            LOG.info("Suspending the slot manager.");
            this.resourceTracker.clear();
            if (this.taskExecutorManager != null) {
                this.taskExecutorManager.close();
                Iterator<InstanceID> it2 = this.taskExecutorManager.getTaskExecutors().iterator();
                while (it2.hasNext()) {
                    unregisterTaskManager(it2.next(), new SlotManagerException("The slot manager is being suspended."));
                }
            }
            this.taskExecutorManager = null;
            this.resourceManagerId = null;
            this.resourceActions = null;
            this.started = false;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.info("Closing the slot manager.");
        suspend();
        this.slotManagerMetricGroup.close();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public void clearResourceRequirements(JobID jobID) {
        checkInit();
        maybeReclaimInactiveSlots(jobID);
        this.jobMasterTargetAddresses.remove(jobID);
        this.resourceTracker.notifyResourceRequirements(jobID, Collections.emptyList());
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public void processResourceRequirements(ResourceRequirements resourceRequirements) {
        checkInit();
        if (resourceRequirements.getResourceRequirements().isEmpty()) {
            LOG.info("Clearing resource requirements of job {}", resourceRequirements.getJobId());
        } else {
            LOG.info("Received resource requirements from job {}: {}", resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
        }
        if (!resourceRequirements.getResourceRequirements().isEmpty()) {
            this.jobMasterTargetAddresses.put(resourceRequirements.getJobId(), resourceRequirements.getTargetAddress());
        }
        this.resourceTracker.notifyResourceRequirements(resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
        checkResourceRequirements();
    }

    private void maybeReclaimInactiveSlots(JobID jobID) {
        if (this.resourceTracker.getAcquiredResources(jobID).isEmpty()) {
            return;
        }
        Iterator<TaskExecutorConnection> it2 = this.slotTracker.getTaskExecutorsWithAllocatedSlotsForJob(jobID).iterator();
        while (it2.hasNext()) {
            it2.next().getTaskExecutorGateway().freeInactiveSlots(jobID, this.taskManagerRequestTimeout);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public boolean registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport slotReport, ResourceProfile resourceProfile, ResourceProfile resourceProfile2) {
        checkInit();
        LOG.debug("Registering task executor {} under {} at the slot manager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
        if (this.taskExecutorManager.isTaskManagerRegistered(taskExecutorConnection.getInstanceID())) {
            LOG.debug("Task executor {} was already registered.", taskExecutorConnection.getResourceID());
            reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport);
            return false;
        }
        if (!this.taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport, resourceProfile, resourceProfile2)) {
            LOG.debug("Task executor {} could not be registered.", taskExecutorConnection.getResourceID());
            return false;
        }
        Iterator<SlotStatus> it2 = slotReport.iterator();
        while (it2.hasNext()) {
            SlotStatus next = it2.next();
            this.slotTracker.addSlot(next.getSlotID(), next.getResourceProfile(), taskExecutorConnection, next.getJobID());
        }
        checkResourceRequirements();
        return true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public boolean unregisterTaskManager(InstanceID instanceID, Exception exc) {
        checkInit();
        LOG.debug("Unregistering task executor {} from the slot manager.", instanceID);
        if (!this.taskExecutorManager.isTaskManagerRegistered(instanceID)) {
            LOG.debug("There is no task executor registered with instance ID {}. Ignoring this message.", instanceID);
            return false;
        }
        this.slotTracker.removeSlots(this.taskExecutorManager.getSlotsOf(instanceID));
        this.taskExecutorManager.unregisterTaskExecutor(instanceID);
        checkResourceRequirements();
        return true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public boolean reportSlotStatus(InstanceID instanceID, SlotReport slotReport) {
        checkInit();
        LOG.debug("Received slot report from instance {}: {}.", instanceID, slotReport);
        if (!this.taskExecutorManager.isTaskManagerRegistered(instanceID)) {
            LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceID);
            return false;
        }
        if (!this.slotTracker.notifySlotStatus(slotReport)) {
            return true;
        }
        checkResourceRequirements();
        return true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public void freeSlot(SlotID slotID, AllocationID allocationID) {
        checkInit();
        LOG.debug("Freeing slot {}.", slotID);
        this.slotTracker.notifyFree(slotID);
        checkResourceRequirements();
    }

    private void checkResourceRequirements() {
        Map<JobID, Collection<ResourceRequirement>> missingResources = this.resourceTracker.getMissingResources();
        if (missingResources.isEmpty()) {
            return;
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<JobID, Collection<ResourceRequirement>> entry : missingResources.entrySet()) {
            JobID key = entry.getKey();
            ResourceCounter tryAllocateSlotsForJob = tryAllocateSlotsForJob(key, entry.getValue());
            if (!tryAllocateSlotsForJob.isEmpty()) {
                linkedHashMap.put(key, tryAllocateSlotsForJob);
            }
        }
        if (linkedHashMap.isEmpty()) {
            return;
        }
        ResourceCounter withResources = ResourceCounter.withResources((Map) this.taskExecutorManager.getPendingTaskManagerSlots().stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getResourceProfile();
        }, Collectors.summingInt(pendingTaskManagerSlot -> {
            return 1;
        }))));
        for (Map.Entry entry2 : linkedHashMap.entrySet()) {
            withResources = tryFulfillRequirementsWithPendingSlots((JobID) entry2.getKey(), ((ResourceCounter) entry2.getValue()).getResourcesWithCount(), withResources);
        }
    }

    private ResourceCounter tryAllocateSlotsForJob(JobID jobID, Collection<ResourceRequirement> collection) {
        ResourceCounter empty = ResourceCounter.empty();
        for (ResourceRequirement resourceRequirement : collection) {
            int internalTryAllocateSlots = internalTryAllocateSlots(jobID, this.jobMasterTargetAddresses.get(jobID), resourceRequirement);
            if (internalTryAllocateSlots > 0) {
                empty = empty.add(resourceRequirement.getResourceProfile(), internalTryAllocateSlots);
            }
        }
        return empty;
    }

    private int internalTryAllocateSlots(JobID jobID, String str, ResourceRequirement resourceRequirement) {
        ResourceProfile resourceProfile = resourceRequirement.getResourceProfile();
        Collection<TaskManagerSlotInformation> freeSlots = this.slotTracker.getFreeSlots();
        int i = 0;
        int i2 = 0;
        while (true) {
            if (i2 >= resourceRequirement.getNumberOfRequiredSlots()) {
                break;
            }
            Optional findMatchingSlot = this.slotMatchingStrategy.findMatchingSlot(resourceProfile, freeSlots, this::getNumberRegisteredSlotsOf);
            if (!findMatchingSlot.isPresent()) {
                i = 0 + (resourceRequirement.getNumberOfRequiredSlots() - i2);
                break;
            }
            allocateSlot((TaskManagerSlotInformation) findMatchingSlot.get(), jobID, str, resourceProfile);
            i2++;
        }
        return i;
    }

    private void allocateSlot(TaskManagerSlotInformation taskManagerSlotInformation, JobID jobID, String str, ResourceProfile resourceProfile) {
        SlotID slotId = taskManagerSlotInformation.getSlotId();
        LOG.debug("Starting allocation of slot {} for job {} with resource profile {}.", slotId, jobID, resourceProfile);
        InstanceID instanceId = taskManagerSlotInformation.getInstanceId();
        if (!this.taskExecutorManager.isTaskManagerRegistered(instanceId)) {
            throw new IllegalStateException("Could not find a registered task manager for instance id " + instanceId + '.');
        }
        TaskExecutorGateway taskExecutorGateway = taskManagerSlotInformation.getTaskManagerConnection().getTaskExecutorGateway();
        AllocationID allocationID = new AllocationID();
        this.slotTracker.notifyAllocationStart(slotId, jobID);
        this.taskExecutorManager.markUsed(instanceId);
        this.pendingSlotAllocations.put(slotId, allocationID);
        FutureUtils.assertNoException(taskExecutorGateway.requestSlot(slotId, jobID, allocationID, resourceProfile, str, this.resourceManagerId, this.taskManagerRequestTimeout).handleAsync((acknowledge, th) -> {
            AllocationID allocationID2 = this.pendingSlotAllocations.get(slotId);
            if (allocationID2 == null || !allocationID2.equals(allocationID)) {
                LOG.debug("Ignoring slot allocation update from task executor {} for slot {} and job {}, because the allocation was already completed or cancelled.", instanceId, slotId, jobID);
                return null;
            }
            if (acknowledge != null) {
                LOG.trace("Completed allocation of slot {} for job {}.", slotId, jobID);
                this.slotTracker.notifyAllocationComplete(slotId, jobID);
                return null;
            }
            if (th instanceof SlotOccupiedException) {
                SlotOccupiedException slotOccupiedException = (SlotOccupiedException) th;
                LOG.debug("Tried allocating slot {} for job {}, but it was already allocated for job {}.", slotId, jobID, slotOccupiedException.getJobId());
                this.slotTracker.notifySlotStatus(Collections.singleton(new SlotStatus(slotId, taskManagerSlotInformation.getResourceProfile(), slotOccupiedException.getJobId(), slotOccupiedException.getAllocationId())));
            } else {
                LOG.warn("Slot allocation for slot {} for job {} failed.", slotId, jobID, th);
                this.slotTracker.notifyFree(slotId);
            }
            checkResourceRequirements();
            return null;
        }, this.mainThreadExecutor));
    }

    private ResourceCounter tryFulfillRequirementsWithPendingSlots(JobID jobID, Collection<Map.Entry<ResourceProfile, Integer>> collection, ResourceCounter resourceCounter) {
        for (Map.Entry<ResourceProfile, Integer> entry : collection) {
            ResourceProfile key = entry.getKey();
            for (int i = 0; i < entry.getValue().intValue(); i++) {
                MatchingResult tryFulfillWithPendingSlots = tryFulfillWithPendingSlots(key, resourceCounter);
                resourceCounter = tryFulfillWithPendingSlots.getNewAvailableResources();
                if (!tryFulfillWithPendingSlots.isSuccessfulMatching()) {
                    WorkerAllocationResult tryAllocateWorkerAndReserveSlot = tryAllocateWorkerAndReserveSlot(key, resourceCounter);
                    resourceCounter = tryAllocateWorkerAndReserveSlot.getNewAvailableResources();
                    if (!tryAllocateWorkerAndReserveSlot.isSuccessfulAllocating() && this.sendNotEnoughResourceNotifications) {
                        LOG.warn("Could not fulfill resource requirements of job {}.", jobID);
                        this.resourceActions.notifyNotEnoughResourcesAvailable(jobID, this.resourceTracker.getAcquiredResources(jobID));
                        return resourceCounter;
                    }
                }
            }
        }
        return resourceCounter;
    }

    private MatchingResult tryFulfillWithPendingSlots(ResourceProfile resourceProfile, ResourceCounter resourceCounter) {
        Set<ResourceProfile> resources = resourceCounter.getResources();
        if (resources.contains(resourceProfile)) {
            return new MatchingResult(true, resourceCounter.subtract(resourceProfile, 1));
        }
        for (ResourceProfile resourceProfile2 : resources) {
            if (resourceProfile2.isMatching(resourceProfile)) {
                return new MatchingResult(true, resourceCounter.subtract(resourceProfile2, 1));
            }
        }
        return new MatchingResult(false, resourceCounter);
    }

    private WorkerAllocationResult tryAllocateWorkerAndReserveSlot(ResourceProfile resourceProfile, ResourceCounter resourceCounter) {
        Optional<ResourceRequirement> allocateWorker = this.taskExecutorManager.allocateWorker(resourceProfile);
        if (!allocateWorker.isPresent()) {
            return new WorkerAllocationResult(false, resourceCounter);
        }
        ResourceRequirement resourceRequirement = allocateWorker.get();
        if (resourceRequirement.getNumberOfRequiredSlots() > 1) {
            resourceCounter = resourceCounter.add(resourceRequirement.getResourceProfile(), resourceRequirement.getNumberOfRequiredSlots() - 1);
        }
        return new WorkerAllocationResult(true, resourceCounter);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public int getNumberRegisteredSlots() {
        return this.taskExecutorManager.getNumberRegisteredSlots();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public int getNumberRegisteredSlotsOf(InstanceID instanceID) {
        return this.taskExecutorManager.getNumberRegisteredSlotsOf(instanceID);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public int getNumberFreeSlots() {
        return this.taskExecutorManager.getNumberFreeSlots();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public int getNumberFreeSlotsOf(InstanceID instanceID) {
        return this.taskExecutorManager.getNumberFreeSlotsOf(instanceID);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public Map<WorkerResourceSpec, Integer> getRequiredResources() {
        return this.taskExecutorManager.getRequiredWorkers();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public ResourceProfile getRegisteredResource() {
        return this.taskExecutorManager.getTotalRegisteredResources();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
        return this.taskExecutorManager.getTotalRegisteredResourcesOf(instanceID);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public ResourceProfile getFreeResource() {
        return this.taskExecutorManager.getTotalFreeResources();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
        return this.taskExecutorManager.getTotalFreeResourcesOf(instanceID);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
        return Collections.emptyList();
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager
    public int getNumberPendingSlotRequests() {
        throw new UnsupportedOperationException();
    }

    private void checkInit() {
        Preconditions.checkState(this.started, "The slot manager has not been started.");
    }
}
