package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.class */
public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final DeploymentOption deploymentOption = new DeploymentOption(false);
    private final Map<IntermediateDataSetID, Set<ConsumedPartitionGroup>> correlatedResultPartitionGroups = new HashMap();
    private final Map<ConsumedPartitionGroup, Set<SchedulingPipelinedRegion>> partitionGroupConsumerRegions = new IdentityHashMap();
    private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> regionVerticesSorted = new IdentityHashMap();
    private final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups = Collections.newSetFromMap(new IdentityHashMap());

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy$Factory.class */
    public static class Factory implements SchedulingStrategyFactory {
        @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new PipelinedRegionSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }

    public PipelinedRegionSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations) Preconditions.checkNotNull(schedulerOperations);
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        init();
    }

    private void init() {
        initCrossRegionConsumedPartitionGroups();
        initPartitionGroupConsumerRegions();
        initCorrelatedResultPartitionGroups();
        Iterator it2 = this.schedulingTopology.getVertices().iterator();
        while (it2.hasNext()) {
            SchedulingExecutionVertex schedulingExecutionVertex = (SchedulingExecutionVertex) it2.next();
            this.regionVerticesSorted.computeIfAbsent(this.schedulingTopology.getPipelinedRegionOfVertex(schedulingExecutionVertex.getId()), schedulingPipelinedRegion -> {
                return new ArrayList();
            }).add(schedulingExecutionVertex.getId());
        }
    }

    private void initCrossRegionConsumedPartitionGroups() {
        IdentityHashMap identityHashMap = new IdentityHashMap();
        Iterator<? extends SchedulingPipelinedRegion> it2 = this.schedulingTopology.getAllPipelinedRegions().iterator();
        while (it2.hasNext()) {
            Iterator<ConsumedPartitionGroup> it3 = it2.next().getAllBlockingConsumedPartitionGroups().iterator();
            while (it3.hasNext()) {
                identityHashMap.computeIfAbsent(it3.next(), this::getProducerRegionsForConsumedPartitionGroup);
            }
        }
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            for (ConsumedPartitionGroup consumedPartitionGroup : schedulingPipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
                Set set = (Set) identityHashMap.get(consumedPartitionGroup);
                if (set.size() > 1 && set.contains(schedulingPipelinedRegion)) {
                    this.crossRegionConsumedPartitionGroups.add(consumedPartitionGroup);
                }
            }
        }
    }

    private Set<SchedulingPipelinedRegion> getProducerRegionsForConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup) {
        Set<SchedulingPipelinedRegion> newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
        Iterator<IntermediateResultPartitionID> it2 = consumedPartitionGroup.iterator();
        while (it2.hasNext()) {
            newSetFromMap.add(getProducerRegion(it2.next()));
        }
        return newSetFromMap;
    }

    private SchedulingPipelinedRegion getProducerRegion(IntermediateResultPartitionID intermediateResultPartitionID) {
        return this.schedulingTopology.getPipelinedRegionOfVertex(this.schedulingTopology.getResultPartition(intermediateResultPartitionID).getProducer2().getId());
    }

    private void initPartitionGroupConsumerRegions() {
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            for (ConsumedPartitionGroup consumedPartitionGroup : schedulingPipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
                if (this.crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup) || isExternalConsumedPartitionGroup(consumedPartitionGroup, schedulingPipelinedRegion)) {
                    this.partitionGroupConsumerRegions.computeIfAbsent(consumedPartitionGroup, consumedPartitionGroup2 -> {
                        return new HashSet();
                    }).add(schedulingPipelinedRegion);
                }
            }
        }
    }

    private void initCorrelatedResultPartitionGroups() {
        for (ConsumedPartitionGroup consumedPartitionGroup : this.partitionGroupConsumerRegions.keySet()) {
            Iterator<IntermediateResultPartitionID> it2 = consumedPartitionGroup.iterator();
            while (it2.hasNext()) {
                this.correlatedResultPartitionGroups.computeIfAbsent(it2.next().getIntermediateDataSetID(), intermediateDataSetID -> {
                    return new HashSet();
                }).add(consumedPartitionGroup);
            }
        }
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void startScheduling() {
        maybeScheduleRegions((Set) IterableUtils.toStream(this.schedulingTopology.getAllPipelinedRegions()).filter(this::isSourceRegion).collect(Collectors.toSet()));
    }

    private boolean isSourceRegion(SchedulingPipelinedRegion schedulingPipelinedRegion) {
        for (ConsumedPartitionGroup consumedPartitionGroup : schedulingPipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
            if (this.crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup) || isExternalConsumedPartitionGroup(consumedPartitionGroup, schedulingPipelinedRegion)) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void restartTasks(Set<ExecutionVertexID> set) {
        Stream<ExecutionVertexID> stream = set.stream();
        SchedulingTopology schedulingTopology = this.schedulingTopology;
        schedulingTopology.getClass();
        maybeScheduleRegions((Set) stream.map((v1) -> {
            return r1.getPipelinedRegionOfVertex(v1);
        }).collect(Collectors.toSet()));
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onExecutionStateChange(ExecutionVertexID executionVertexID, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            maybeScheduleRegions((Set) ((Set) IterableUtils.toStream(this.schedulingTopology.getVertex(executionVertexID).getProducedResults()).filter(schedulingResultPartition -> {
                return schedulingResultPartition.getState() == ResultPartitionState.CONSUMABLE;
            }).flatMap(schedulingResultPartition2 -> {
                return this.correlatedResultPartitionGroups.getOrDefault(schedulingResultPartition2.getResultId(), Collections.emptySet()).stream();
            }).collect(Collectors.toSet())).stream().flatMap(consumedPartitionGroup -> {
                return this.partitionGroupConsumerRegions.get(consumedPartitionGroup).stream();
            }).distinct().filter(this::areRegionVerticesAllInCreatedState).collect(Collectors.toSet()));
        }
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onPartitionConsumable(IntermediateResultPartitionID intermediateResultPartitionID) {
    }

    private void maybeScheduleRegions(Set<SchedulingPipelinedRegion> set) {
        List<SchedulingPipelinedRegion> sortPipelinedRegionsInTopologicalOrder = SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(this.schedulingTopology, set);
        HashMap hashMap = new HashMap();
        Iterator<SchedulingPipelinedRegion> it2 = sortPipelinedRegionsInTopologicalOrder.iterator();
        while (it2.hasNext()) {
            maybeScheduleRegion(it2.next(), hashMap);
        }
    }

    private void maybeScheduleRegion(SchedulingPipelinedRegion schedulingPipelinedRegion, Map<ConsumedPartitionGroup, Boolean> map) {
        if (areRegionInputsAllConsumable(schedulingPipelinedRegion, map)) {
            Preconditions.checkState(areRegionVerticesAllInCreatedState(schedulingPipelinedRegion), "BUG: trying to schedule a region which is not in CREATED state");
            this.schedulerOperations.allocateSlotsAndDeploy(SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(this.regionVerticesSorted.get(schedulingPipelinedRegion), executionVertexID -> {
                return this.deploymentOption;
            }));
        }
    }

    private boolean areRegionInputsAllConsumable(SchedulingPipelinedRegion schedulingPipelinedRegion, Map<ConsumedPartitionGroup, Boolean> map) {
        for (ConsumedPartitionGroup consumedPartitionGroup : schedulingPipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
            if (this.crossRegionConsumedPartitionGroups.contains(consumedPartitionGroup)) {
                if (!isCrossRegionConsumedPartitionConsumable(consumedPartitionGroup, schedulingPipelinedRegion)) {
                    return false;
                }
            } else if (isExternalConsumedPartitionGroup(consumedPartitionGroup, schedulingPipelinedRegion) && !map.computeIfAbsent(consumedPartitionGroup, this::isConsumedPartitionGroupConsumable).booleanValue()) {
                return false;
            }
        }
        return true;
    }

    private boolean isConsumedPartitionGroupConsumable(ConsumedPartitionGroup consumedPartitionGroup) {
        Iterator<IntermediateResultPartitionID> it2 = consumedPartitionGroup.iterator();
        while (it2.hasNext()) {
            if (this.schedulingTopology.getResultPartition(it2.next()).getState() != ResultPartitionState.CONSUMABLE) {
                return false;
            }
        }
        return true;
    }

    private boolean isCrossRegionConsumedPartitionConsumable(ConsumedPartitionGroup consumedPartitionGroup, SchedulingPipelinedRegion schedulingPipelinedRegion) {
        Iterator<IntermediateResultPartitionID> it2 = consumedPartitionGroup.iterator();
        while (it2.hasNext()) {
            IntermediateResultPartitionID next = it2.next();
            if (isExternalConsumedPartition(next, schedulingPipelinedRegion) && this.schedulingTopology.getResultPartition(next).getState() != ResultPartitionState.CONSUMABLE) {
                return false;
            }
        }
        return true;
    }

    private boolean areRegionVerticesAllInCreatedState(SchedulingPipelinedRegion schedulingPipelinedRegion) {
        Iterator<? extends SchedulingExecutionVertex> it2 = schedulingPipelinedRegion.getVertices().iterator();
        while (it2.hasNext()) {
            if (it2.next().getState() != ExecutionState.CREATED) {
                return false;
            }
        }
        return true;
    }

    private boolean isExternalConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup, SchedulingPipelinedRegion schedulingPipelinedRegion) {
        return isExternalConsumedPartition(consumedPartitionGroup.getFirst(), schedulingPipelinedRegion);
    }

    private boolean isExternalConsumedPartition(IntermediateResultPartitionID intermediateResultPartitionID, SchedulingPipelinedRegion schedulingPipelinedRegion) {
        return !schedulingPipelinedRegion.contains(this.schedulingTopology.getResultPartition(intermediateResultPartitionID).getProducer2().getId());
    }

    @VisibleForTesting
    Set<ConsumedPartitionGroup> getCrossRegionConsumedPartitionGroups() {
        return Collections.unmodifiableSet(this.crossRegionConsumedPartitionGroups);
    }
}
