package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.Preconditions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.class */
public class LocalInputPreferredSlotSharingStrategy implements SlotSharingStrategy {
    private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy$ExecutionSlotSharingGroupBuilder.class */
    private static class ExecutionSlotSharingGroupBuilder {
        private final SchedulingTopology topology;
        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap;
        final Map<CoLocationConstraint, ExecutionSlotSharingGroup> constraintToExecutionSlotSharingGroupMap;
        final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>> executionSlotSharingGroups;
        private final Map<ExecutionSlotSharingGroup, Set<JobVertexID>> assignedJobVerticesForGroups;

        private ExecutionSlotSharingGroupBuilder(SchedulingTopology schedulingTopology, Set<SlotSharingGroup> set, Set<CoLocationGroup> set2) {
            this.topology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
            this.slotSharingGroupMap = new HashMap();
            for (SlotSharingGroup slotSharingGroup : set) {
                Iterator<JobVertexID> it2 = slotSharingGroup.getJobVertexIds().iterator();
                while (it2.hasNext()) {
                    this.slotSharingGroupMap.put(it2.next(), slotSharingGroup);
                }
            }
            this.coLocationGroupMap = new HashMap();
            for (CoLocationGroup coLocationGroup : set2) {
                Iterator<JobVertexID> it3 = coLocationGroup.getVertexIds().iterator();
                while (it3.hasNext()) {
                    this.coLocationGroupMap.put(it3.next(), coLocationGroup);
                }
            }
            this.executionSlotSharingGroupMap = new HashMap();
            this.constraintToExecutionSlotSharingGroupMap = new HashMap();
            this.executionSlotSharingGroups = new HashMap();
            this.assignedJobVerticesForGroups = new IdentityHashMap();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
            for (List<SchedulingExecutionVertex> list : getExecutionVertices().values()) {
                findAvailableOrCreateNewExecutionSlotSharingGroupFor(tryFindOptimalAvailableExecutionSlotSharingGroupFor(list));
                updateConstraintToExecutionSlotSharingGroupMap(list);
            }
            return this.executionSlotSharingGroupMap;
        }

        private LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> getExecutionVertices() {
            LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> linkedHashMap = new LinkedHashMap<>();
            Iterator it2 = this.topology.getVertices().iterator();
            while (it2.hasNext()) {
                SchedulingExecutionVertex schedulingExecutionVertex = (SchedulingExecutionVertex) it2.next();
                linkedHashMap.computeIfAbsent(schedulingExecutionVertex.getId().getJobVertexId(), jobVertexID -> {
                    return new ArrayList();
                }).add(schedulingExecutionVertex);
            }
            return linkedHashMap;
        }

        private List<SchedulingExecutionVertex> tryFindOptimalAvailableExecutionSlotSharingGroupFor(List<SchedulingExecutionVertex> list) {
            ArrayList arrayList = new ArrayList();
            for (SchedulingExecutionVertex schedulingExecutionVertex : list) {
                ExecutionSlotSharingGroup tryFindAvailableCoLocatedExecutionSlotSharingGroupFor = tryFindAvailableCoLocatedExecutionSlotSharingGroupFor(schedulingExecutionVertex);
                if (tryFindAvailableCoLocatedExecutionSlotSharingGroupFor == null) {
                    tryFindAvailableCoLocatedExecutionSlotSharingGroupFor = tryFindAvailableProducerExecutionSlotSharingGroupFor(schedulingExecutionVertex);
                }
                if (tryFindAvailableCoLocatedExecutionSlotSharingGroupFor == null) {
                    arrayList.add(schedulingExecutionVertex);
                } else {
                    addVertexToExecutionSlotSharingGroup(schedulingExecutionVertex, tryFindAvailableCoLocatedExecutionSlotSharingGroupFor);
                }
            }
            return arrayList;
        }

        private ExecutionSlotSharingGroup tryFindAvailableCoLocatedExecutionSlotSharingGroupFor(SchedulingExecutionVertex schedulingExecutionVertex) {
            ExecutionVertexID id = schedulingExecutionVertex.getId();
            CoLocationGroup coLocationGroup = this.coLocationGroupMap.get(id.getJobVertexId());
            if (coLocationGroup == null) {
                return null;
            }
            return this.constraintToExecutionSlotSharingGroupMap.get(coLocationGroup.getLocationConstraint(id.getSubtaskIndex()));
        }

        private ExecutionSlotSharingGroup tryFindAvailableProducerExecutionSlotSharingGroupFor(SchedulingExecutionVertex schedulingExecutionVertex) {
            ExecutionVertexID id = schedulingExecutionVertex.getId();
            Iterator<? extends SchedulingResultPartition> it2 = schedulingExecutionVertex.getConsumedResults().iterator();
            while (it2.hasNext()) {
                ExecutionVertexID id2 = it2.next().getProducer2().getId();
                if (inSameLogicalSlotSharingGroup(id2, id)) {
                    ExecutionSlotSharingGroup executionSlotSharingGroup = this.executionSlotSharingGroupMap.get(id2);
                    Preconditions.checkState(executionSlotSharingGroup != null);
                    if (isGroupAvailableForVertex(executionSlotSharingGroup, id)) {
                        return executionSlotSharingGroup;
                    }
                }
            }
            return null;
        }

        private boolean inSameLogicalSlotSharingGroup(ExecutionVertexID executionVertexID, ExecutionVertexID executionVertexID2) {
            return Objects.equals(getSlotSharingGroup(executionVertexID).getSlotSharingGroupId(), getSlotSharingGroup(executionVertexID2).getSlotSharingGroupId());
        }

        private SlotSharingGroup getSlotSharingGroup(ExecutionVertexID executionVertexID) {
            return (SlotSharingGroup) Preconditions.checkNotNull(this.slotSharingGroupMap.get(executionVertexID.getJobVertexId()));
        }

        private boolean isGroupAvailableForVertex(ExecutionSlotSharingGroup executionSlotSharingGroup, ExecutionVertexID executionVertexID) {
            Set<JobVertexID> set = this.assignedJobVerticesForGroups.get(executionSlotSharingGroup);
            return set == null || !set.contains(executionVertexID.getJobVertexId());
        }

        private void addVertexToExecutionSlotSharingGroup(SchedulingExecutionVertex schedulingExecutionVertex, ExecutionSlotSharingGroup executionSlotSharingGroup) {
            executionSlotSharingGroup.addVertex(schedulingExecutionVertex.getId());
            this.executionSlotSharingGroupMap.put(schedulingExecutionVertex.getId(), executionSlotSharingGroup);
            this.assignedJobVerticesForGroups.computeIfAbsent(executionSlotSharingGroup, executionSlotSharingGroup2 -> {
                return new HashSet();
            }).add(schedulingExecutionVertex.getId().getJobVertexId());
        }

        private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(List<SchedulingExecutionVertex> list) {
            for (SchedulingExecutionVertex schedulingExecutionVertex : list) {
                SlotSharingGroup slotSharingGroup = getSlotSharingGroup(schedulingExecutionVertex.getId());
                List<ExecutionSlotSharingGroup> computeIfAbsent = this.executionSlotSharingGroups.computeIfAbsent(slotSharingGroup.getSlotSharingGroupId(), slotSharingGroupId -> {
                    return new ArrayList();
                });
                ExecutionSlotSharingGroup executionSlotSharingGroup = null;
                Iterator<ExecutionSlotSharingGroup> it2 = computeIfAbsent.iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    ExecutionSlotSharingGroup next = it2.next();
                    if (isGroupAvailableForVertex(next, schedulingExecutionVertex.getId())) {
                        executionSlotSharingGroup = next;
                        break;
                    }
                }
                if (executionSlotSharingGroup == null) {
                    executionSlotSharingGroup = new ExecutionSlotSharingGroup();
                    executionSlotSharingGroup.setResourceProfile(slotSharingGroup.getResourceProfile());
                    computeIfAbsent.add(executionSlotSharingGroup);
                }
                addVertexToExecutionSlotSharingGroup(schedulingExecutionVertex, executionSlotSharingGroup);
            }
        }

        private void updateConstraintToExecutionSlotSharingGroupMap(List<SchedulingExecutionVertex> list) {
            Iterator<SchedulingExecutionVertex> it2 = list.iterator();
            while (it2.hasNext()) {
                ExecutionVertexID id = it2.next().getId();
                CoLocationGroup coLocationGroup = this.coLocationGroupMap.get(id.getJobVertexId());
                if (coLocationGroup != null) {
                    this.constraintToExecutionSlotSharingGroupMap.put(coLocationGroup.getLocationConstraint(id.getSubtaskIndex()), this.executionSlotSharingGroupMap.get(id));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy$Factory.class */
    static class Factory implements SlotSharingStrategy.Factory {
        @Override // org.apache.flink.runtime.scheduler.SlotSharingStrategy.Factory
        public LocalInputPreferredSlotSharingStrategy create(SchedulingTopology schedulingTopology, Set<SlotSharingGroup> set, Set<CoLocationGroup> set2) {
            return new LocalInputPreferredSlotSharingStrategy(schedulingTopology, set, set2);
        }

        @Override // org.apache.flink.runtime.scheduler.SlotSharingStrategy.Factory
        public /* bridge */ /* synthetic */ SlotSharingStrategy create(SchedulingTopology schedulingTopology, Set set, Set set2) {
            return create(schedulingTopology, (Set<SlotSharingGroup>) set, (Set<CoLocationGroup>) set2);
        }
    }

    LocalInputPreferredSlotSharingStrategy(SchedulingTopology schedulingTopology, Set<SlotSharingGroup> set, Set<CoLocationGroup> set2) {
        this.executionSlotSharingGroupMap = new ExecutionSlotSharingGroupBuilder(schedulingTopology, set, set2).build();
    }

    @Override // org.apache.flink.runtime.scheduler.SlotSharingStrategy
    public ExecutionSlotSharingGroup getExecutionSlotSharingGroup(ExecutionVertexID executionVertexID) {
        return this.executionSlotSharingGroupMap.get(executionVertexID);
    }

    @Override // org.apache.flink.runtime.scheduler.SlotSharingStrategy
    public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() {
        return new HashSet(this.executionSlotSharingGroupMap.values());
    }
}
