package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.class */
public class DefaultSlotStatusSyncer implements SlotStatusSyncer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultSlotStatusSyncer.class);
    private final Time taskManagerRequestTimeout;

    @Nullable
    private TaskManagerTracker taskManagerTracker;

    @Nullable
    private ResourceTracker resourceTracker;

    @Nullable
    private Executor mainThreadExecutor;

    @Nullable
    private ResourceManagerId resourceManagerId;
    private final Set<AllocationID> pendingSlotAllocations = new HashSet();
    private boolean started = false;

    public DefaultSlotStatusSyncer(Time time) {
        this.taskManagerRequestTimeout = (Time) Preconditions.checkNotNull(time);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer
    public void initialize(TaskManagerTracker taskManagerTracker, ResourceTracker resourceTracker, ResourceManagerId resourceManagerId, Executor executor) {
        this.taskManagerTracker = (TaskManagerTracker) Preconditions.checkNotNull(taskManagerTracker);
        this.resourceTracker = (ResourceTracker) Preconditions.checkNotNull(resourceTracker);
        this.mainThreadExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.resourceManagerId = (ResourceManagerId) Preconditions.checkNotNull(resourceManagerId);
        this.pendingSlotAllocations.clear();
        this.started = true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer
    public void close() {
        this.taskManagerTracker = null;
        this.resourceTracker = null;
        this.mainThreadExecutor = null;
        this.resourceManagerId = null;
        this.pendingSlotAllocations.clear();
        this.started = false;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer
    public CompletableFuture<Void> allocateSlot(InstanceID instanceID, JobID jobID, String str, ResourceProfile resourceProfile) {
        Preconditions.checkNotNull(instanceID);
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(resourceProfile);
        checkStarted();
        AllocationID allocationID = new AllocationID();
        Optional<TaskManagerInfo> registeredTaskManager = this.taskManagerTracker.getRegisteredTaskManager(instanceID);
        Preconditions.checkState(registeredTaskManager.isPresent(), "Could not find a registered task manager for instance id " + instanceID + '.');
        TaskExecutorGateway taskExecutorGateway = registeredTaskManager.get().getTaskExecutorConnection().getTaskExecutorGateway();
        this.taskManagerTracker.notifySlotStatus(allocationID, jobID, instanceID, resourceProfile, SlotState.PENDING);
        this.resourceTracker.notifyAcquiredResource(jobID, resourceProfile);
        this.pendingSlotAllocations.add(allocationID);
        CompletableFuture<Acknowledge> requestSlot = taskExecutorGateway.requestSlot(SlotID.getDynamicSlotID(registeredTaskManager.get().getTaskExecutorConnection().getResourceID()), jobID, allocationID, resourceProfile, str, this.resourceManagerId, this.taskManagerRequestTimeout);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        FutureUtils.assertNoException(requestSlot.handleAsync((acknowledge, th) -> {
            if (!this.pendingSlotAllocations.remove(allocationID)) {
                LOG.debug("Ignoring slot allocation update from task manager {} for allocation {} and job {}, because the allocation was already completed or cancelled.", instanceID, allocationID, jobID);
                completableFuture.complete(null);
                return null;
            }
            if (!this.taskManagerTracker.getAllocatedOrPendingSlot(allocationID).isPresent()) {
                LOG.debug("The slot {} has been removed before. Ignore the future.", allocationID);
                requestSlot.complete(null);
                return null;
            }
            if (acknowledge != null) {
                LOG.trace("Completed allocation of allocation {} for job {}.", allocationID, jobID);
                this.taskManagerTracker.notifySlotStatus(allocationID, jobID, instanceID, resourceProfile, SlotState.ALLOCATED);
                completableFuture.complete(null);
                return null;
            }
            if (th instanceof SlotOccupiedException) {
                LOG.error("Should not get this exception.", th);
            } else {
                LOG.warn("Slot allocation for allocation {} for job {} failed.", allocationID, jobID, th);
                this.resourceTracker.notifyLostResource(jobID, resourceProfile);
                this.taskManagerTracker.notifySlotStatus(allocationID, jobID, instanceID, resourceProfile, SlotState.FREE);
            }
            completableFuture.completeExceptionally(th);
            return null;
        }, this.mainThreadExecutor));
        return completableFuture;
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer
    public void freeSlot(AllocationID allocationID) {
        Preconditions.checkNotNull(allocationID);
        checkStarted();
        LOG.debug("Freeing slot {}.", allocationID);
        Optional<TaskManagerSlotInformation> allocatedOrPendingSlot = this.taskManagerTracker.getAllocatedOrPendingSlot(allocationID);
        if (!allocatedOrPendingSlot.isPresent()) {
            LOG.warn("Try to free unknown slot {}.", allocationID);
            return;
        }
        TaskManagerSlotInformation taskManagerSlotInformation = allocatedOrPendingSlot.get();
        if (taskManagerSlotInformation.getState() == SlotState.PENDING) {
            this.pendingSlotAllocations.remove(allocationID);
        }
        this.resourceTracker.notifyLostResource(taskManagerSlotInformation.getJobId(), taskManagerSlotInformation.getResourceProfile());
        this.taskManagerTracker.notifySlotStatus(allocationID, taskManagerSlotInformation.getJobId(), taskManagerSlotInformation.getInstanceId(), taskManagerSlotInformation.getResourceProfile(), SlotState.FREE);
    }

    @Override // org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer
    public boolean reportSlotStatus(InstanceID instanceID, SlotReport slotReport) {
        Preconditions.checkNotNull(slotReport);
        Preconditions.checkNotNull(instanceID);
        checkStarted();
        Optional<TaskManagerInfo> registeredTaskManager = this.taskManagerTracker.getRegisteredTaskManager(instanceID);
        if (!registeredTaskManager.isPresent()) {
            LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceID);
            return false;
        }
        LOG.debug("Received slot report from instance {}: {}.", instanceID, slotReport);
        boolean z = true;
        HashSet hashSet = new HashSet();
        slotReport.iterator().forEachRemaining(slotStatus -> {
            hashSet.add(slotStatus.getAllocationID());
        });
        Iterator it2 = new HashSet(registeredTaskManager.get().getAllocatedSlots().values()).iterator();
        while (it2.hasNext()) {
            TaskManagerSlotInformation taskManagerSlotInformation = (TaskManagerSlotInformation) it2.next();
            if (!hashSet.contains(taskManagerSlotInformation.getAllocationId()) && taskManagerSlotInformation.getState() == SlotState.ALLOCATED) {
                this.taskManagerTracker.notifySlotStatus(taskManagerSlotInformation.getAllocationId(), taskManagerSlotInformation.getJobId(), taskManagerSlotInformation.getInstanceId(), taskManagerSlotInformation.getResourceProfile(), SlotState.FREE);
                this.resourceTracker.notifyLostResource(taskManagerSlotInformation.getJobId(), taskManagerSlotInformation.getResourceProfile());
                z = false;
            }
        }
        Iterator<SlotStatus> it3 = slotReport.iterator();
        while (it3.hasNext()) {
            SlotStatus next = it3.next();
            if (next.getAllocationID() != null && !syncAllocatedSlotStatus(next, registeredTaskManager.get())) {
                z = false;
            }
        }
        return z;
    }

    private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo taskManagerInfo) {
        AllocationID allocationID = (AllocationID) Preconditions.checkNotNull(slotStatus.getAllocationID());
        JobID jobID = (JobID) Preconditions.checkNotNull(slotStatus.getJobID());
        ResourceProfile resourceProfile = (ResourceProfile) Preconditions.checkNotNull(slotStatus.getResourceProfile());
        if (!taskManagerInfo.getAllocatedSlots().containsKey(allocationID)) {
            Preconditions.checkState(!this.taskManagerTracker.getAllocatedOrPendingSlot(allocationID).isPresent(), "Duplicated allocation for " + allocationID);
            this.taskManagerTracker.notifySlotStatus(allocationID, jobID, taskManagerInfo.getInstanceId(), resourceProfile, SlotState.ALLOCATED);
            this.resourceTracker.notifyAcquiredResource(jobID, resourceProfile);
            return false;
        }
        if (taskManagerInfo.getAllocatedSlots().get(allocationID).getState() != SlotState.PENDING) {
            return true;
        }
        TaskManagerSlotInformation taskManagerSlotInformation = taskManagerInfo.getAllocatedSlots().get(allocationID);
        this.pendingSlotAllocations.remove(taskManagerSlotInformation.getAllocationId());
        this.taskManagerTracker.notifySlotStatus(taskManagerSlotInformation.getAllocationId(), taskManagerSlotInformation.getJobId(), taskManagerSlotInformation.getInstanceId(), taskManagerSlotInformation.getResourceProfile(), SlotState.ALLOCATED);
        return true;
    }

    private void checkStarted() {
        Preconditions.checkState(this.started);
        Preconditions.checkNotNull(this.taskManagerTracker);
        Preconditions.checkNotNull(this.resourceTracker);
        Preconditions.checkNotNull(this.mainThreadExecutor);
        Preconditions.checkNotNull(this.resourceManagerId);
    }
}
