package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.class */
public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy {
    private final SchedulingTopology schedulingTopology;
    private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap();
    private final Map<ConsumedPartitionGroup, ConsumerRegionGroupExecutionView> partitionGroupConsumerRegions = new HashMap();
    private final ConsumerRegionGroupExecutionViewMaintainer consumerRegionGroupExecutionViewMaintainer;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy$Factory.class */
    public static class Factory implements PartitionReleaseStrategy.Factory {
        @Override // org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy.Factory
        public PartitionReleaseStrategy createInstance(SchedulingTopology schedulingTopology) {
            return new RegionPartitionReleaseStrategy(schedulingTopology);
        }
    }

    public RegionPartitionReleaseStrategy(SchedulingTopology schedulingTopology) {
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        initRegionExecutionViewByVertex();
        initPartitionGroupConsumerRegions();
        this.consumerRegionGroupExecutionViewMaintainer = new ConsumerRegionGroupExecutionViewMaintainer(this.partitionGroupConsumerRegions.values());
    }

    private void initRegionExecutionViewByVertex() {
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(schedulingPipelinedRegion);
            Iterator<? extends SchedulingExecutionVertex> it2 = schedulingPipelinedRegion.getVertices().iterator();
            while (it2.hasNext()) {
                this.regionExecutionViewByVertex.put(it2.next().getId(), pipelinedRegionExecutionView);
            }
        }
    }

    private void initPartitionGroupConsumerRegions() {
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            Iterator<ConsumedPartitionGroup> it2 = schedulingPipelinedRegion.getAllBlockingConsumedPartitionGroups().iterator();
            while (it2.hasNext()) {
                this.partitionGroupConsumerRegions.computeIfAbsent(it2.next(), consumedPartitionGroup -> {
                    return new ConsumerRegionGroupExecutionView();
                }).add(schedulingPipelinedRegion);
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy
    public List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID executionVertexID) {
        PipelinedRegionExecutionView pipelinedRegionExecutionViewForVertex = getPipelinedRegionExecutionViewForVertex(executionVertexID);
        pipelinedRegionExecutionViewForVertex.vertexFinished(executionVertexID);
        if (!pipelinedRegionExecutionViewForVertex.isFinished()) {
            return Collections.emptyList();
        }
        SchedulingPipelinedRegion pipelinedRegionOfVertex = this.schedulingTopology.getPipelinedRegionOfVertex(executionVertexID);
        this.consumerRegionGroupExecutionViewMaintainer.regionFinished(pipelinedRegionOfVertex);
        return filterReleasablePartitions(pipelinedRegionOfVertex.getAllBlockingConsumedPartitionGroups());
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy
    public void vertexUnfinished(ExecutionVertexID executionVertexID) {
        getPipelinedRegionExecutionViewForVertex(executionVertexID).vertexUnfinished(executionVertexID);
        this.consumerRegionGroupExecutionViewMaintainer.regionUnfinished(this.schedulingTopology.getPipelinedRegionOfVertex(executionVertexID));
    }

    private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(ExecutionVertexID executionVertexID) {
        PipelinedRegionExecutionView pipelinedRegionExecutionView = this.regionExecutionViewByVertex.get(executionVertexID);
        Preconditions.checkState(pipelinedRegionExecutionView != null, "PipelinedRegionExecutionView not found for execution vertex %s", executionVertexID);
        return pipelinedRegionExecutionView;
    }

    private List<IntermediateResultPartitionID> filterReleasablePartitions(Iterable<ConsumedPartitionGroup> iterable) {
        ArrayList arrayList = new ArrayList();
        for (ConsumedPartitionGroup consumedPartitionGroup : iterable) {
            if (this.partitionGroupConsumerRegions.get(consumedPartitionGroup).isFinished()) {
                Iterator<IntermediateResultPartitionID> it2 = consumedPartitionGroup.iterator();
                while (it2.hasNext()) {
                    arrayList.add(it2.next());
                }
            }
        }
        return arrayList;
    }
}
