package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.class */
public class ResourceManagerPartitionTrackerImpl implements ResourceManagerPartitionTracker {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ResourceManagerPartitionTrackerImpl.class);
    private final Map<ResourceID, Set<IntermediateDataSetID>> taskExecutorToDataSets = new HashMap();
    private final Map<IntermediateDataSetID, Map<ResourceID, Set<ResultPartitionID>>> dataSetToTaskExecutors = new HashMap();
    private final Map<IntermediateDataSetID, DataSetMetaInfo> dataSetMetaInfo = new HashMap();
    private final Map<IntermediateDataSetID, CompletableFuture<Void>> partitionReleaseCompletionFutures = new HashMap();
    private final TaskExecutorClusterPartitionReleaser taskExecutorClusterPartitionReleaser;

    public ResourceManagerPartitionTrackerImpl(TaskExecutorClusterPartitionReleaser taskExecutorClusterPartitionReleaser) {
        this.taskExecutorClusterPartitionReleaser = taskExecutorClusterPartitionReleaser;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTracker
    public void processTaskExecutorClusterPartitionReport(ResourceID resourceID, ClusterPartitionReport clusterPartitionReport) {
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(clusterPartitionReport);
        LOG.debug("Processing cluster partition report from task executor {}: {}.", resourceID, clusterPartitionReport);
        internalProcessClusterPartitionReport(resourceID, clusterPartitionReport);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTracker
    public void processTaskExecutorShutdown(ResourceID resourceID) {
        Preconditions.checkNotNull(resourceID);
        LOG.debug("Processing shutdown of task executor {}.", resourceID);
        internalProcessClusterPartitionReport(resourceID, new ClusterPartitionReport(Collections.emptyList()));
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTracker
    public CompletableFuture<Void> releaseClusterPartitions(IntermediateDataSetID intermediateDataSetID) {
        Preconditions.checkNotNull(intermediateDataSetID);
        if (!this.dataSetMetaInfo.containsKey(intermediateDataSetID)) {
            LOG.debug("Attempted released of unknown data set {}.", intermediateDataSetID);
            return CompletableFuture.completedFuture(null);
        }
        LOG.debug("Releasing cluster partitions for data set {}.", intermediateDataSetID);
        CompletableFuture<Void> computeIfAbsent = this.partitionReleaseCompletionFutures.computeIfAbsent(intermediateDataSetID, intermediateDataSetID2 -> {
            return new CompletableFuture();
        });
        internalReleasePartitions(Collections.singleton(intermediateDataSetID));
        return computeIfAbsent;
    }

    private void internalProcessClusterPartitionReport(ResourceID resourceID, ClusterPartitionReport clusterPartitionReport) {
        Set<IntermediateDataSetID> processEmptyReport = clusterPartitionReport.getEntries().isEmpty() ? processEmptyReport(resourceID) : setHostedDataSetsAndCheckCorruption(resourceID, clusterPartitionReport.getEntries());
        updateDataSetMetaData(clusterPartitionReport);
        checkForFullyLostDatasets(processEmptyReport);
        internalReleasePartitions(processEmptyReport);
    }

    private void internalReleasePartitions(Set<IntermediateDataSetID> set) {
        Map<ResourceID, Set<IntermediateDataSetID>> prepareReleaseCalls = prepareReleaseCalls(set);
        TaskExecutorClusterPartitionReleaser taskExecutorClusterPartitionReleaser = this.taskExecutorClusterPartitionReleaser;
        taskExecutorClusterPartitionReleaser.getClass();
        prepareReleaseCalls.forEach(taskExecutorClusterPartitionReleaser::releaseClusterPartitions);
    }

    private Set<IntermediateDataSetID> processEmptyReport(ResourceID resourceID) {
        Set<IntermediateDataSetID> remove = this.taskExecutorToDataSets.remove(resourceID);
        if (remove == null) {
            remove = Collections.emptySet();
        } else {
            remove.forEach(intermediateDataSetID -> {
                removeInnerKey(intermediateDataSetID, resourceID, this.dataSetToTaskExecutors);
            });
        }
        return remove;
    }

    private Set<IntermediateDataSetID> setHostedDataSetsAndCheckCorruption(ResourceID resourceID, Collection<ClusterPartitionReport.ClusterPartitionReportEntry> collection) {
        Set<IntermediateDataSetID> set = (Set) Optional.ofNullable(this.taskExecutorToDataSets.put(resourceID, (Set) collection.stream().map((v0) -> {
            return v0.getDataSetId();
        }).collect(Collectors.toSet()))).orElse(new HashSet(0));
        collection.forEach(clusterPartitionReportEntry -> {
            Set<ResultPartitionID> put = this.dataSetToTaskExecutors.computeIfAbsent(clusterPartitionReportEntry.getDataSetId(), intermediateDataSetID -> {
                return new HashMap();
            }).put(resourceID, clusterPartitionReportEntry.getHostedPartitions());
            if (put == null || clusterPartitionReportEntry.getHostedPartitions().containsAll(put)) {
                set.remove(clusterPartitionReportEntry.getDataSetId());
            }
        });
        return set;
    }

    private void updateDataSetMetaData(ClusterPartitionReport clusterPartitionReport) {
        clusterPartitionReport.getEntries().forEach(clusterPartitionReportEntry -> {
            this.dataSetMetaInfo.compute(clusterPartitionReportEntry.getDataSetId(), (intermediateDataSetID, dataSetMetaInfo) -> {
                if (dataSetMetaInfo == null) {
                    return DataSetMetaInfo.withoutNumRegisteredPartitions(clusterPartitionReportEntry.getNumTotalPartitions());
                }
                Preconditions.checkState(dataSetMetaInfo.getNumTotalPartitions() == clusterPartitionReportEntry.getNumTotalPartitions());
                return dataSetMetaInfo;
            });
        });
    }

    private void checkForFullyLostDatasets(Set<IntermediateDataSetID> set) {
        set.forEach(intermediateDataSetID -> {
            if (getHostingTaskExecutors(intermediateDataSetID).isEmpty()) {
                LOG.debug("There are no longer partitions being tracked for dataset {}.", intermediateDataSetID);
                this.dataSetMetaInfo.remove(intermediateDataSetID);
                Optional.ofNullable(this.partitionReleaseCompletionFutures.remove(intermediateDataSetID)).map(completableFuture -> {
                    return Boolean.valueOf(completableFuture.complete(null));
                });
            }
        });
    }

    private Map<ResourceID, Set<IntermediateDataSetID>> prepareReleaseCalls(Set<IntermediateDataSetID> set) {
        HashMap hashMap = new HashMap();
        set.forEach(intermediateDataSetID -> {
            getHostingTaskExecutors(intermediateDataSetID).forEach(resourceID -> {
                insert(resourceID, intermediateDataSetID, hashMap);
            });
        });
        return hashMap;
    }

    private Set<ResourceID> getHostingTaskExecutors(IntermediateDataSetID intermediateDataSetID) {
        Preconditions.checkNotNull(intermediateDataSetID);
        Map<ResourceID, Set<ResultPartitionID>> map = this.dataSetToTaskExecutors.get(intermediateDataSetID);
        return map == null ? Collections.emptySet() : map.keySet();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTracker
    public Map<IntermediateDataSetID, DataSetMetaInfo> listDataSets() {
        return (Map) this.dataSetMetaInfo.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            Map<ResourceID, Set<ResultPartitionID>> map = this.dataSetToTaskExecutors.get(entry.getKey());
            Preconditions.checkState(map != null, "Have metadata entry for dataset %s, but no partition is tracked.", entry.getKey());
            int i = 0;
            Iterator<Set<ResultPartitionID>> it2 = map.values().iterator();
            while (it2.hasNext()) {
                i += it2.next().size();
            }
            return DataSetMetaInfo.withNumRegisteredPartitions(i, ((DataSetMetaInfo) entry.getValue()).getNumTotalPartitions());
        }));
    }

    @VisibleForTesting
    boolean areAllMapsEmpty() {
        return this.taskExecutorToDataSets.isEmpty() && this.dataSetToTaskExecutors.isEmpty() && this.dataSetMetaInfo.isEmpty() && this.partitionReleaseCompletionFutures.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> void insert(K k, V v, Map<K, Set<V>> map) {
        map.compute(k, (obj, set) -> {
            if (set == null) {
                set = new HashSet();
            }
            set.add(v);
            return set;
        });
    }

    private static <K1, K2, V> void removeInnerKey(K1 k1, K2 k2, Map<K1, Map<K2, V>> map) {
        map.computeIfPresent(k1, (obj, map2) -> {
            map2.remove(k2);
            if (map2.isEmpty()) {
                return null;
            }
            return map2;
        });
    }
}
