package org.apache.flink.runtime.scheduler.adapter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.EdgeManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.class */
public class DefaultExecutionTopology implements SchedulingTopology {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultExecutionTopology.class);
    private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById;
    private final List<DefaultExecutionVertex> executionVerticesList;
    private final Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionsById;
    private final Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> pipelinedRegionsByVertex;
    private final List<DefaultSchedulingPipelinedRegion> pipelinedRegions;
    private final EdgeManager edgeManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology$ExecutionGraphIndex.class */
    public static class ExecutionGraphIndex {
        private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById;
        private final List<DefaultExecutionVertex> executionVerticesList;
        private final Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionsById;

        private ExecutionGraphIndex(Map<ExecutionVertexID, DefaultExecutionVertex> map, List<DefaultExecutionVertex> list, Map<IntermediateResultPartitionID, DefaultResultPartition> map2) {
            this.executionVerticesById = (Map) Preconditions.checkNotNull(map);
            this.executionVerticesList = (List) Preconditions.checkNotNull(list);
            this.resultPartitionsById = (Map) Preconditions.checkNotNull(map2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology$IndexedPipelinedRegions.class */
    public static class IndexedPipelinedRegions {
        private final Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> pipelinedRegionsByVertex;
        private final List<DefaultSchedulingPipelinedRegion> pipelinedRegions;

        private IndexedPipelinedRegions(Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> map, List<DefaultSchedulingPipelinedRegion> list) {
            this.pipelinedRegionsByVertex = (Map) Preconditions.checkNotNull(map);
            this.pipelinedRegions = (List) Preconditions.checkNotNull(list);
        }
    }

    private DefaultExecutionTopology(Map<ExecutionVertexID, DefaultExecutionVertex> map, List<DefaultExecutionVertex> list, Map<IntermediateResultPartitionID, DefaultResultPartition> map2, Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> map3, List<DefaultSchedulingPipelinedRegion> list2, EdgeManager edgeManager) {
        this.executionVerticesById = (Map) Preconditions.checkNotNull(map);
        this.executionVerticesList = (List) Preconditions.checkNotNull(list);
        this.resultPartitionsById = (Map) Preconditions.checkNotNull(map2);
        this.pipelinedRegionsByVertex = (Map) Preconditions.checkNotNull(map3);
        this.pipelinedRegions = (List) Preconditions.checkNotNull(list2);
        this.edgeManager = edgeManager;
    }

    @Override // org.apache.flink.runtime.topology.BaseTopology
    public Iterable<DefaultExecutionVertex> getVertices() {
        return Collections.unmodifiableList(this.executionVerticesList);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingTopology
    public DefaultExecutionVertex getVertex(ExecutionVertexID executionVertexID) {
        DefaultExecutionVertex defaultExecutionVertex = this.executionVerticesById.get(executionVertexID);
        if (defaultExecutionVertex == null) {
            throw new IllegalArgumentException("can not find vertex: " + executionVertexID);
        }
        return defaultExecutionVertex;
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingTopology
    public DefaultResultPartition getResultPartition(IntermediateResultPartitionID intermediateResultPartitionID) {
        DefaultResultPartition defaultResultPartition = this.resultPartitionsById.get(intermediateResultPartitionID);
        if (defaultResultPartition == null) {
            throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionID);
        }
        return defaultResultPartition;
    }

    @Override // org.apache.flink.runtime.topology.Topology
    public Iterable<? extends SchedulingPipelinedRegion> getAllPipelinedRegions() {
        Preconditions.checkNotNull(this.pipelinedRegions);
        return Collections.unmodifiableCollection(this.pipelinedRegions);
    }

    @Override // org.apache.flink.runtime.topology.Topology
    public DefaultSchedulingPipelinedRegion getPipelinedRegionOfVertex(ExecutionVertexID executionVertexID) {
        Preconditions.checkNotNull(this.pipelinedRegionsByVertex);
        DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = this.pipelinedRegionsByVertex.get(executionVertexID);
        if (defaultSchedulingPipelinedRegion == null) {
            throw new IllegalArgumentException("Unknown execution vertex " + executionVertexID);
        }
        return defaultSchedulingPipelinedRegion;
    }

    public EdgeManager getEdgeManager() {
        return this.edgeManager;
    }

    public static DefaultExecutionTopology fromExecutionGraph(DefaultExecutionGraph defaultExecutionGraph) {
        Preconditions.checkNotNull(defaultExecutionGraph, "execution graph can not be null");
        EdgeManager edgeManager = defaultExecutionGraph.getEdgeManager();
        ExecutionGraphIndex computeExecutionGraphIndex = computeExecutionGraphIndex(defaultExecutionGraph.getAllExecutionVertices(), defaultExecutionGraph.getTotalNumberOfVertices(), edgeManager);
        List list = computeExecutionGraphIndex.executionVerticesList;
        Map map = computeExecutionGraphIndex.resultPartitionsById;
        map.getClass();
        IndexedPipelinedRegions computePipelinedRegions = computePipelinedRegions(list, (v1) -> {
            return r1.get(v1);
        });
        ensureCoLocatedVerticesInSameRegion(computePipelinedRegions.pipelinedRegions, defaultExecutionGraph);
        return new DefaultExecutionTopology(computeExecutionGraphIndex.executionVerticesById, computeExecutionGraphIndex.executionVerticesList, computeExecutionGraphIndex.resultPartitionsById, computePipelinedRegions.pipelinedRegionsByVertex, computePipelinedRegions.pipelinedRegions, edgeManager);
    }

    private static ExecutionGraphIndex computeExecutionGraphIndex(Iterable<ExecutionVertex> iterable, int i, EdgeManager edgeManager) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList(i);
        HashMap hashMap2 = new HashMap();
        for (ExecutionVertex executionVertex : iterable) {
            Map<IntermediateResultPartitionID, IntermediateResultPartition> producedPartitions = executionVertex.getProducedPartitions();
            edgeManager.getClass();
            Function function = edgeManager::getConsumerVertexGroupsForPartition;
            hashMap.getClass();
            List<DefaultResultPartition> generateProducedSchedulingResultPartition = generateProducedSchedulingResultPartition(producedPartitions, function, (v1) -> {
                return r2.get(v1);
            });
            generateProducedSchedulingResultPartition.forEach(defaultResultPartition -> {
            });
            List<ConsumedPartitionGroup> consumedPartitionGroupsForVertex = edgeManager.getConsumedPartitionGroupsForVertex(executionVertex.getID());
            hashMap2.getClass();
            DefaultExecutionVertex generateSchedulingExecutionVertex = generateSchedulingExecutionVertex(executionVertex, generateProducedSchedulingResultPartition, consumedPartitionGroupsForVertex, (v1) -> {
                return r3.get(v1);
            });
            hashMap.put(generateSchedulingExecutionVertex.getId(), generateSchedulingExecutionVertex);
            arrayList.add(generateSchedulingExecutionVertex);
        }
        return new ExecutionGraphIndex(hashMap, arrayList, hashMap2);
    }

    private static List<DefaultResultPartition> generateProducedSchedulingResultPartition(Map<IntermediateResultPartitionID, IntermediateResultPartition> map, Function<IntermediateResultPartitionID, List<ConsumerVertexGroup>> function, Function<ExecutionVertexID, DefaultExecutionVertex> function2) {
        ArrayList arrayList = new ArrayList(map.size());
        map.values().forEach(intermediateResultPartition -> {
            arrayList.add(new DefaultResultPartition(intermediateResultPartition.getPartitionId(), intermediateResultPartition.getIntermediateResult().getId(), intermediateResultPartition.getResultType(), () -> {
                return intermediateResultPartition.isConsumable() ? ResultPartitionState.CONSUMABLE : ResultPartitionState.CREATED;
            }, (List) function.apply(intermediateResultPartition.getPartitionId()), function2));
        });
        return arrayList;
    }

    private static DefaultExecutionVertex generateSchedulingExecutionVertex(ExecutionVertex executionVertex, List<DefaultResultPartition> list, List<ConsumedPartitionGroup> list2, Function<IntermediateResultPartitionID, DefaultResultPartition> function) {
        ExecutionVertexID id = executionVertex.getID();
        executionVertex.getClass();
        DefaultExecutionVertex defaultExecutionVertex = new DefaultExecutionVertex(id, list, executionVertex::getExecutionState, list2, function);
        list.forEach(defaultResultPartition -> {
            defaultResultPartition.setProducer(defaultExecutionVertex);
        });
        return defaultExecutionVertex;
    }

    private static IndexedPipelinedRegions computePipelinedRegions(Iterable<DefaultExecutionVertex> iterable, Function<IntermediateResultPartitionID, DefaultResultPartition> function) {
        long nanoTime = System.nanoTime();
        Set<Set> computePipelinedRegions = PipelinedRegionComputeUtil.computePipelinedRegions(iterable);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (Set set : computePipelinedRegions) {
            DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = new DefaultSchedulingPipelinedRegion(set, function);
            arrayList.add(defaultSchedulingPipelinedRegion);
            Iterator it2 = set.iterator();
            while (it2.hasNext()) {
                hashMap.put(((SchedulingExecutionVertex) it2.next()).getId(), defaultSchedulingPipelinedRegion);
            }
        }
        LOG.info("Built {} pipelined regions in {} ms", Integer.valueOf(arrayList.size()), Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
        return new IndexedPipelinedRegions(hashMap, arrayList);
    }

    private static void ensureCoLocatedVerticesInSameRegion(List<DefaultSchedulingPipelinedRegion> list, ExecutionGraph executionGraph) {
        HashMap hashMap = new HashMap();
        Iterator<DefaultSchedulingPipelinedRegion> it2 = list.iterator();
        while (it2.hasNext()) {
            DefaultSchedulingPipelinedRegion next = it2.next();
            Iterator<? extends SchedulingExecutionVertex> it3 = next.getVertices().iterator();
            while (it3.hasNext()) {
                CoLocationConstraint coLocationConstraint = getCoLocationConstraint(((DefaultExecutionVertex) it3.next()).getId(), executionGraph);
                if (coLocationConstraint != null) {
                    DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = (DefaultSchedulingPipelinedRegion) hashMap.get(coLocationConstraint);
                    Preconditions.checkState(defaultSchedulingPipelinedRegion == null || defaultSchedulingPipelinedRegion == next, "co-located tasks must be in the same pipelined region");
                    hashMap.putIfAbsent(coLocationConstraint, next);
                }
            }
        }
    }

    private static CoLocationConstraint getCoLocationConstraint(ExecutionVertexID executionVertexID, ExecutionGraph executionGraph) {
        CoLocationGroup coLocationGroup = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(executionVertexID.getJobVertexId()))).getCoLocationGroup();
        if (coLocationGroup == null) {
            return null;
        }
        return coLocationGroup.getLocationConstraint(executionVertexID.getSubtaskIndex());
    }
}
