package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.class */
class DefaultInputConsumableDeciderTest {
    DefaultInputConsumableDeciderTest() {
    }

    @Test
    void testNotFinishedBlockingInput() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        DefaultInputConsumableDecider createDefaultInputConsumableDecider = createDefaultInputConsumableDecider(Collections.emptySet(), testingSchedulingTopology);
        finish2.forEach(testingSchedulingExecutionVertex -> {
            testingSchedulingExecutionVertex.getConsumedPartitionGroups().forEach(consumedPartitionGroup -> {
                Assertions.assertThat(createDefaultInputConsumableDecider.isConsumableBasedOnFinishedProducers(consumedPartitionGroup)).isFalse();
            });
        });
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(0), Collections.emptySet(), new HashMap())).isFalse();
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(1), Collections.emptySet(), new HashMap())).isFalse();
    }

    @Test
    void testAllFinishedBlockingInput() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionState(ResultPartitionState.ALL_DATA_PRODUCED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        DefaultInputConsumableDecider createDefaultInputConsumableDecider = createDefaultInputConsumableDecider(Collections.emptySet(), testingSchedulingTopology);
        finish2.forEach(testingSchedulingExecutionVertex -> {
            testingSchedulingExecutionVertex.getConsumedPartitionGroups().forEach(consumedPartitionGroup -> {
                Assertions.assertThat(createDefaultInputConsumableDecider.isConsumableBasedOnFinishedProducers(consumedPartitionGroup)).isTrue();
            });
        });
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(0), Collections.emptySet(), new HashMap())).isTrue();
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(1), Collections.emptySet(), new HashMap())).isTrue();
    }

    @Test
    void testUpstreamNotScheduledHybridInput() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        DefaultInputConsumableDecider createDefaultInputConsumableDecider = createDefaultInputConsumableDecider(Collections.emptySet(), testingSchedulingTopology);
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(0), Collections.emptySet(), new HashMap())).isFalse();
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(1), Collections.emptySet(), new HashMap())).isFalse();
    }

    @Test
    void testUpstreamAllScheduledHybridInput() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        HashSet hashSet = new HashSet();
        DefaultInputConsumableDecider createDefaultInputConsumableDecider = createDefaultInputConsumableDecider(hashSet, testingSchedulingTopology);
        hashSet.add(finish.get(0).m567getId());
        HashSet hashSet2 = new HashSet();
        hashSet2.add(finish.get(1).m567getId());
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(0), hashSet2, new HashMap())).isTrue();
        Assertions.assertThat(createDefaultInputConsumableDecider.isInputConsumable(finish2.get(1), hashSet2, new HashMap())).isTrue();
    }

    @Test
    void testHybridAndBlockingInputButBlockingInputNotFinished() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(1).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(1).finish();
        List<TestingSchedulingExecutionVertex> finish3 = testingSchedulingTopology.addExecutionVertices().withParallelism(1).finish();
        testingSchedulingTopology.connectAllToAll(finish, finish3).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        testingSchedulingTopology.connectAllToAll(finish2, finish3).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        Assertions.assertThat(createDefaultInputConsumableDecider(Collections.singleton(finish2.get(0).m567getId()), testingSchedulingTopology).isInputConsumable(finish3.get(0), Collections.emptySet(), new HashMap())).isFalse();
    }

    private DefaultInputConsumableDecider createDefaultInputConsumableDecider(Set<ExecutionVertexID> set, SchedulingTopology schedulingTopology) {
        set.getClass();
        Function function = (v1) -> {
            return r2.contains(v1);
        };
        schedulingTopology.getClass();
        return new DefaultInputConsumableDecider(function, schedulingTopology::getResultPartition);
    }
}
