package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.CompressedSerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.class */
class CachedShuffleDescriptorsTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    /* loaded from: input_file:org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest$TestingShuffleDescriptorSerializer.class */
    private static class TestingShuffleDescriptorSerializer implements TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer {
        private TestingShuffleDescriptorSerializer() {
        }

        public TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup shuffleDescriptorGroup, int i) throws IOException {
            return new TaskDeploymentDescriptor.NonOffloaded(CompressedSerializedValue.fromObject(shuffleDescriptorGroup));
        }
    }

    CachedShuffleDescriptorsTest() {
    }

    @Test
    void testCreateAndGet() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        ExecutionGraph buildExecutionGraph = buildExecutionGraph(jobVertex, jobVertex2, 1, 1, DistributionPattern.ALL_TO_ALL);
        ExecutionJobVertex jobVertex3 = buildExecutionGraph.getJobVertex(jobVertex.getID());
        Assertions.assertThat(jobVertex3).isNotNull();
        ExecutionVertex executionVertex = jobVertex3.getTaskVertices()[0];
        ExecutionJobVertex jobVertex4 = buildExecutionGraph.getJobVertex(jobVertex2.getID());
        Assertions.assertThat(jobVertex4).isNotNull();
        ExecutionVertex executionVertex2 = jobVertex4.getTaskVertices()[0];
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition) executionVertex.getProducedPartitions().values().stream().findAny().get();
        ConsumedPartitionGroup consumedPartitionGroup = executionVertex2.getConsumedPartitionGroup(0);
        ShuffleDescriptor shuffleDescriptor = getShuffleDescriptor(intermediateResultPartition);
        CachedShuffleDescriptors cachedShuffleDescriptors = new CachedShuffleDescriptors(consumedPartitionGroup, createSingleShuffleDescriptorAndIndex(shuffleDescriptor, 0));
        Assertions.assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups()).isEmpty();
        cachedShuffleDescriptors.serializeShuffleDescriptors(new TestingShuffleDescriptorSerializer());
        Assertions.assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups()).hasSize(1);
        assertNonOffloadedShuffleDescriptorAndIndexEquals((TaskDeploymentDescriptor.MaybeOffloaded) cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups().get(0), Collections.singletonList(shuffleDescriptor), Collections.singletonList(0));
    }

    @Test
    void testMarkPartitionFinishAndSerialize() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        ExecutionGraph buildExecutionGraph = buildExecutionGraph(jobVertex, jobVertex2, 2, 1, DistributionPattern.ALL_TO_ALL);
        ExecutionJobVertex jobVertex3 = buildExecutionGraph.getJobVertex(jobVertex.getID());
        Assertions.assertThat(jobVertex3).isNotNull();
        ExecutionVertex executionVertex = jobVertex3.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = jobVertex3.getTaskVertices()[1];
        ExecutionJobVertex jobVertex4 = buildExecutionGraph.getJobVertex(jobVertex2.getID());
        Assertions.assertThat(jobVertex4).isNotNull();
        ExecutionVertex executionVertex3 = jobVertex4.getTaskVertices()[0];
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition) executionVertex.getProducedPartitions().values().stream().findAny().get();
        IntermediateResultPartition intermediateResultPartition2 = (IntermediateResultPartition) executionVertex2.getProducedPartitions().values().stream().findAny().get();
        CachedShuffleDescriptors cachedShuffleDescriptors = new CachedShuffleDescriptors(executionVertex3.getConsumedPartitionGroup(0), createSingleShuffleDescriptorAndIndex(getShuffleDescriptor(intermediateResultPartition), 0));
        TestingShuffleDescriptorSerializer testingShuffleDescriptorSerializer = new TestingShuffleDescriptorSerializer();
        cachedShuffleDescriptors.serializeShuffleDescriptors(testingShuffleDescriptorSerializer);
        cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition);
        cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition2);
        cachedShuffleDescriptors.serializeShuffleDescriptors(testingShuffleDescriptorSerializer);
        Assertions.assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups()).hasSize(2);
        assertNonOffloadedShuffleDescriptorAndIndexEquals((TaskDeploymentDescriptor.MaybeOffloaded) cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups().get(1), Arrays.asList(TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(intermediateResultPartition, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, false), TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(intermediateResultPartition2, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, false)), Arrays.asList(0, 1));
    }

    private void assertNonOffloadedShuffleDescriptorAndIndexEquals(TaskDeploymentDescriptor.MaybeOffloaded<TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup> maybeOffloaded, List<ShuffleDescriptor> list, List<Integer> list2) throws Exception {
        Assertions.assertThat(list).hasSameSizeAs(list2);
        Assertions.assertThat(maybeOffloaded).isInstanceOf(TaskDeploymentDescriptor.NonOffloaded.class);
        TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] shuffleDescriptors = ((TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup) ((TaskDeploymentDescriptor.NonOffloaded) maybeOffloaded).serializedValue.deserializeValue(getClass().getClassLoader())).getShuffleDescriptors();
        Assertions.assertThat(shuffleDescriptors).hasSameSizeAs(list);
        for (int i = 0; i < shuffleDescriptors.length; i++) {
            Assertions.assertThat(shuffleDescriptors[i].getIndex()).isEqualTo(list2.get(i));
            Assertions.assertThat(shuffleDescriptors[i].getShuffleDescriptor().getResultPartitionID()).isEqualTo(list.get(i).getResultPartitionID());
        }
    }

    private ShuffleDescriptor getShuffleDescriptor(IntermediateResultPartition intermediateResultPartition) {
        return TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(intermediateResultPartition, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, true);
    }

    private static TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[] createSingleShuffleDescriptorAndIndex(ShuffleDescriptor shuffleDescriptor, int i) {
        return new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]{new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex(shuffleDescriptor, i)};
    }

    private ExecutionGraph buildExecutionGraph(JobVertex jobVertex, JobVertex jobVertex2, int i, int i2, DistributionPattern distributionPattern) throws Exception {
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i2);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, distributionPattern, ResultPartitionType.HYBRID_FULL);
        DefaultScheduler build = new DefaultSchedulerBuilder(JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).build();
        build.startScheduling();
        return build.getExecutionGraph();
    }
}
