package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.class */
public class PhysicalSlotRequestBulkCheckerImpl implements PhysicalSlotRequestBulkChecker {
    private ComponentMainThreadExecutor componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("PhysicalSlotRequestBulkCheckerImpl is not initialized with proper main thread executor, call to PhysicalSlotRequestBulkCheckerImpl#start is required");
    private final Supplier<Set<SlotInfo>> slotsRetriever;
    private final Clock clock;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl$TimeoutCheckResult.class */
    public enum TimeoutCheckResult {
        PENDING,
        FULFILLED,
        TIMEOUT
    }

    PhysicalSlotRequestBulkCheckerImpl(Supplier<Set<SlotInfo>> supplier, Clock clock) {
        this.slotsRetriever = (Supplier) Preconditions.checkNotNull(supplier);
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker
    public void start(ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.componentMainThreadExecutor = componentMainThreadExecutor;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker
    public void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk physicalSlotRequestBulk, Time time) {
        PhysicalSlotRequestBulkWithTimestamp physicalSlotRequestBulkWithTimestamp = new PhysicalSlotRequestBulkWithTimestamp(physicalSlotRequestBulk);
        physicalSlotRequestBulkWithTimestamp.markUnfulfillable(this.clock.relativeTimeMillis());
        schedulePendingRequestBulkWithTimestampCheck(physicalSlotRequestBulkWithTimestamp, time);
    }

    private void schedulePendingRequestBulkWithTimestampCheck(PhysicalSlotRequestBulkWithTimestamp physicalSlotRequestBulkWithTimestamp, Time time) {
        this.componentMainThreadExecutor.schedule(() -> {
            switch (checkPhysicalSlotRequestBulkTimeout(physicalSlotRequestBulkWithTimestamp, time)) {
                case PENDING:
                    schedulePendingRequestBulkWithTimestampCheck(physicalSlotRequestBulkWithTimestamp, time);
                    return;
                case TIMEOUT:
                    physicalSlotRequestBulkWithTimestamp.cancel(new NoResourceAvailableException("Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout", new TimeoutException("Timeout has occurred: " + time)));
                    return;
                case FULFILLED:
                default:
                    return;
            }
        }, time.getSize(), time.getUnit());
    }

    @VisibleForTesting
    TimeoutCheckResult checkPhysicalSlotRequestBulkTimeout(PhysicalSlotRequestBulkWithTimestamp physicalSlotRequestBulkWithTimestamp, Time time) {
        if (physicalSlotRequestBulkWithTimestamp.getPendingRequests().isEmpty()) {
            return TimeoutCheckResult.FULFILLED;
        }
        if (isSlotRequestBulkFulfillable(physicalSlotRequestBulkWithTimestamp, this.slotsRetriever)) {
            physicalSlotRequestBulkWithTimestamp.markFulfillable();
        } else {
            long relativeTimeMillis = this.clock.relativeTimeMillis();
            physicalSlotRequestBulkWithTimestamp.markUnfulfillable(relativeTimeMillis);
            if (physicalSlotRequestBulkWithTimestamp.getUnfulfillableSince() + time.toMilliseconds() <= relativeTimeMillis) {
                return TimeoutCheckResult.TIMEOUT;
            }
        }
        return TimeoutCheckResult.PENDING;
    }

    @VisibleForTesting
    static boolean isSlotRequestBulkFulfillable(PhysicalSlotRequestBulk physicalSlotRequestBulk, Supplier<Set<SlotInfo>> supplier) {
        return areRequestsFulfillableWithSlots(physicalSlotRequestBulk.getPendingRequests(), getReusableSlots(supplier, physicalSlotRequestBulk.getAllocationIdsOfFulfilledRequests()));
    }

    private static Set<SlotInfo> getReusableSlots(Supplier<Set<SlotInfo>> supplier, Set<AllocationID> set) {
        return (Set) supplier.get().stream().filter(slotInfo -> {
            return !slotInfo.willBeOccupiedIndefinitely();
        }).filter(slotInfo2 -> {
            return !set.contains(slotInfo2.getAllocationId());
        }).collect(Collectors.toSet());
    }

    private static boolean areRequestsFulfillableWithSlots(Collection<ResourceProfile> collection, Set<SlotInfo> set) {
        HashSet hashSet = new HashSet(set);
        Iterator<ResourceProfile> it2 = collection.iterator();
        while (it2.hasNext()) {
            Optional<SlotInfo> findMatchingSlotForRequest = findMatchingSlotForRequest(it2.next(), hashSet);
            if (!findMatchingSlotForRequest.isPresent()) {
                return false;
            }
            hashSet.remove(findMatchingSlotForRequest.get());
        }
        return true;
    }

    private static Optional<SlotInfo> findMatchingSlotForRequest(ResourceProfile resourceProfile, Collection<SlotInfo> collection) {
        return collection.stream().filter(slotInfo -> {
            return slotInfo.getResourceProfile().isMatching(resourceProfile);
        }).findFirst();
    }

    public static PhysicalSlotRequestBulkCheckerImpl createFromSlotPool(SlotPool slotPool, Clock clock) {
        return new PhysicalSlotRequestBulkCheckerImpl(() -> {
            return getAllSlotInfos(slotPool);
        }, clock);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
        return (Set) Stream.concat(slotPool.getAvailableSlotsInformation().stream(), slotPool.getAllocatedSlotsInformation().stream()).collect(Collectors.toSet());
    }
}
