package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.DataBufferTest;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.class */
public class SortMergeResultPartitionTest {
    private static final int bufferSize = 1024;
    private static final int totalBuffers = 1000;
    private static final int totalBytes = 33554432;
    private static final int numThreads = 4;

    @Parameter
    public boolean useHashDataBuffer;
    private final TestBufferAvailabilityListener listener = new TestBufferAvailabilityListener();
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;
    private BatchShuffleReadBufferPool readBufferPool;
    private ExecutorService readIOExecutor;

    @TempDir
    private Path tmpFolder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest$TestBufferAvailabilityListener.class */
    public static final class TestBufferAvailabilityListener implements BufferAvailabilityListener {
        private int numNotifications;

        private TestBufferAvailabilityListener() {
        }

        public synchronized void notifyDataAvailable() {
            if (this.numNotifications == 0) {
                notifyAll();
            }
            this.numNotifications++;
        }

        public synchronized void waitForData() throws InterruptedException {
            if (this.numNotifications == 0) {
                wait();
            }
            this.numNotifications = 0;
        }
    }

    @BeforeEach
    void setUp() throws IOException {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{TempDirUtils.newFolder(this.tmpFolder).toString()}, "testing");
        this.globalPool = new NetworkBufferPool(1000, 1024);
        this.readBufferPool = new BatchShuffleReadBufferPool(33554432L, 1024);
        this.readIOExecutor = Executors.newFixedThreadPool(4);
    }

    @AfterEach
    void shutdown() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
        this.readBufferPool.destroy();
        this.readIOExecutor.shutdown();
    }

    @Parameters(name = "useHashDataBuffer={0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    @TestTemplate
    void testWriteAndRead() throws Exception {
        int i = this.useHashDataBuffer ? 100 : 15;
        Random random = new Random();
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(i, i));
        Queue<DataBufferTest.DataAndType>[] queueArr = new Queue[10];
        Queue[] queueArr2 = new Queue[10];
        for (int i2 = 0; i2 < 10; i2++) {
            queueArr[i2] = new ArrayDeque();
            queueArr2[i2] = new ArrayDeque();
        }
        int[] iArr = new int[10];
        int[] iArr2 = new int[10];
        Arrays.fill(iArr, 0);
        Arrays.fill(iArr2, 0);
        for (int i3 = 0; i3 < 1000; i3++) {
            ByteBuffer generateRandomData = generateRandomData(random.nextInt(2048) + 1, random);
            if (random.nextBoolean()) {
                createSortMergedPartition.broadcastRecord(generateRandomData);
                for (int i4 = 0; i4 < 10; i4++) {
                    recordDataWritten(generateRandomData, queueArr, i4, iArr, Buffer.DataType.DATA_BUFFER);
                }
            } else {
                int nextInt = random.nextInt(10);
                createSortMergedPartition.emitRecord(generateRandomData, nextInt);
                recordDataWritten(generateRandomData, queueArr, nextInt, iArr, Buffer.DataType.DATA_BUFFER);
            }
        }
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        for (int i5 = 0; i5 < 10; i5++) {
            recordDataWritten(EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE), queueArr, i5, iArr, Buffer.DataType.EVENT_BUFFER);
        }
        readData(createSubpartitionViews(createSortMergedPartition, 10), bufferWithChannel -> {
            CompositeBuffer buffer = bufferWithChannel.getBuffer();
            int channelIndex = bufferWithChannel.getChannelIndex();
            int readableBytes = buffer.readableBytes();
            iArr2[channelIndex] = iArr2[channelIndex] + readableBytes;
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(readableBytes);
            Buffer fullBufferData = buffer.getFullBufferData(MemorySegmentFactory.allocateUnpooledSegment(readableBytes));
            allocateUnpooledSegment.put(0, fullBufferData.getNioBufferReadable(), fullBufferData.readableBytes());
            queueArr2[channelIndex].add(new NetworkBuffer(allocateUnpooledSegment, memorySegment -> {
            }, fullBufferData.getDataType(), fullBufferData.isCompressed(), fullBufferData.readableBytes()));
            fullBufferData.recycleBuffer();
        });
        DataBufferTest.checkWriteReadResult(10, iArr, iArr2, queueArr, queueArr2);
    }

    private void recordDataWritten(ByteBuffer byteBuffer, Queue<DataBufferTest.DataAndType>[] queueArr, int i, int[] iArr, Buffer.DataType dataType) {
        byteBuffer.rewind();
        queueArr[i].add(new DataBufferTest.DataAndType(byteBuffer, dataType));
        iArr[i] = iArr[i] + byteBuffer.remaining();
    }

    private ByteBuffer generateRandomData(int i, Random random) {
        byte[] bArr = new byte[i];
        random.nextBytes(bArr);
        return ByteBuffer.wrap(bArr);
    }

    private long readData(ResultSubpartitionView[] resultSubpartitionViewArr, Consumer<BufferWithChannel> consumer) throws Exception {
        int i = 0;
        int i2 = 0;
        while (i2 < resultSubpartitionViewArr.length) {
            this.listener.waitForData();
            for (int i3 = 0; i3 < resultSubpartitionViewArr.length; i3++) {
                ResultSubpartitionView resultSubpartitionView = resultSubpartitionViewArr[i3];
                ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
                while (true) {
                    ResultSubpartition.BufferAndBacklog bufferAndBacklog = nextBuffer;
                    if (bufferAndBacklog != null) {
                        Buffer buffer = bufferAndBacklog.buffer();
                        consumer.accept(new BufferWithChannel(buffer, i3));
                        i += buffer.readableBytes();
                        if (!buffer.isBuffer()) {
                            i2++;
                            Assertions.assertThat(resultSubpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()).isFalse();
                            resultSubpartitionView.releaseAllResources();
                        }
                        nextBuffer = resultSubpartitionView.getNextBuffer();
                    }
                }
            }
        }
        return i;
    }

    private ResultSubpartitionView[] createSubpartitionViews(SortMergeResultPartition sortMergeResultPartition, int i) throws Exception {
        ResultSubpartitionView[] resultSubpartitionViewArr = new ResultSubpartitionView[i];
        for (int i2 = 0; i2 < i; i2++) {
            resultSubpartitionViewArr[i2] = sortMergeResultPartition.createSubpartitionView(i2, this.listener);
        }
        return resultSubpartitionViewArr;
    }

    @TestTemplate
    void testWriteLargeRecord() throws Exception {
        int i = this.useHashDataBuffer ? 100 : 15;
        BufferPool createBufferPool = this.globalPool.createBufferPool(i, i);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        ByteBuffer generateRandomData = generateRandomData(1024 * i, new Random());
        createSortMergedPartition.emitRecord(generateRandomData, 0);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(this.useHashDataBuffer ? i : 0);
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(0, this.listener);
        ByteBuffer allocate = ByteBuffer.allocate(1024 * i);
        readData(new ResultSubpartitionView[]{createSubpartitionView}, bufferWithChannel -> {
            CompositeBuffer buffer = bufferWithChannel.getBuffer();
            Buffer fullBufferData = buffer.getFullBufferData(MemorySegmentFactory.allocateUnpooledSegment(buffer.readableBytes()));
            if (fullBufferData.isBuffer()) {
                allocate.put((ByteBuffer) ByteBuffer.allocate(fullBufferData.readableBytes()).put(fullBufferData.getNioBufferReadable()).flip());
            }
            fullBufferData.recycleBuffer();
        });
        generateRandomData.rewind();
        allocate.flip();
        Assertions.assertThat(allocate).isEqualTo(generateRandomData);
    }

    @TestTemplate
    void testDataBroadcast() throws Exception {
        int i = this.useHashDataBuffer ? 100 : 15;
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(i, i));
        for (int i2 = 0; i2 < 10000; i2++) {
            createSortMergedPartition.broadcastRecord(generateRandomData(1024, new Random()));
        }
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        long remaining = (10 * 10000 * 1024) + (10 * EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE).remaining());
        Assertions.assertThat(createSortMergedPartition.getResultFile()).isNotNull();
        Assertions.assertThat(((String[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].list())).length).isEqualTo(2);
        for (File file : (File[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].listFiles())) {
            if (file.getName().endsWith(".shuffle.data")) {
                Assertions.assertThat(file.length()).isLessThan(10 * 10000 * 1024);
            }
        }
        Assertions.assertThat(readData(createSubpartitionViews(createSortMergedPartition, 10), bufferWithChannel -> {
            bufferWithChannel.getBuffer().recycleBuffer();
        })).isEqualTo(remaining);
    }

    @TestTemplate
    void testReleaseWhileWriting() throws Exception {
        int i = this.useHashDataBuffer ? 100 : 15;
        BufferPool createBufferPool = this.globalPool.createBufferPool(i, i);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(i);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024 * (i - 1)), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024 * (i - 1)), 1);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024), 2);
        Assertions.assertThat(createSortMergedPartition.getResultFile()).isNull();
        Assertions.assertThat(this.fileChannelManager.getPaths()[0].list().length).isEqualTo(2);
        createSortMergedPartition.release();
        Assertions.assertThatThrownBy(() -> {
            createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024 * i), 2);
        }).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat(this.fileChannelManager.getPaths()[0].list().length).isEqualTo(0);
    }

    @TestTemplate
    void testRelease() throws Exception {
        int i = this.useHashDataBuffer ? 100 : 15;
        BufferPool createBufferPool = this.globalPool.createBufferPool(i, i);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(i);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024 * (i - 1)), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024 * (i - 1)), 1);
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        Assertions.assertThat(createSortMergedPartition.getResultFile().getNumRegions()).isEqualTo(3);
        Assertions.assertThat(((String[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].list())).length).isEqualTo(2);
        ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(0, this.listener);
        createSortMergedPartition.release();
        while (!createSubpartitionView.isReleased() && createSortMergedPartition.getResultFile() != null) {
            ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionView.getNextBuffer();
            if (nextBuffer != null) {
                nextBuffer.buffer().recycleBuffer();
            }
        }
        while (createSortMergedPartition.getResultFile() != null) {
            Thread.sleep(100L);
        }
        Assertions.assertThat(((String[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].list())).length).isEqualTo(0);
    }

    @TestTemplate
    void testCloseReleasesAllBuffers() throws Exception {
        int i = this.useHashDataBuffer ? 100 : 15;
        BufferPool createBufferPool = this.globalPool.createBufferPool(i, i);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(i);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024 * (i - 1)), 5);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(this.useHashDataBuffer ? i : 0);
        createSortMergedPartition.close();
        Assertions.assertThat(createBufferPool.isDestroyed()).isTrue();
        Assertions.assertThat(this.globalPool.getNumberOfAvailableMemorySegments()).isEqualTo(1000);
    }

    @TestTemplate
    void testReadUnfinishedPartition() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        Assertions.assertThatThrownBy(() -> {
            createSortMergedPartition.createSubpartitionView(0, this.listener);
        }).isInstanceOf(IllegalStateException.class);
        createBufferPool.lazyDestroy();
    }

    @TestTemplate
    void testReadReleasedPartition() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        createSortMergedPartition.finish();
        createSortMergedPartition.release();
        Assertions.assertThatThrownBy(() -> {
            createSortMergedPartition.createSubpartitionView(0, this.listener);
        }).isInstanceOf(IllegalStateException.class);
        createBufferPool.lazyDestroy();
    }

    @TestTemplate
    void testNumBytesProducedCounterForUnicast() throws IOException {
        testResultPartitionBytesCounter(false);
    }

    @TestTemplate
    void testNumBytesProducedCounterForBroadcast() throws IOException {
        testResultPartitionBytesCounter(true);
    }

    @TestTemplate
    void testNetworkBufferReservation() throws IOException {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 2 * 10);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(1, createBufferPool);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(10);
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
    }

    @TestTemplate
    void testNoDeadlockOnSpecificConsumptionOrder() throws Exception {
        int i = 8192;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(8192, 1024);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(1, networkBufferPool.createBufferPool(8192, 8192), new BatchShuffleReadBufferPool(4194304L, 1024));
        for (int i2 = 0; i2 < 8192; i2++) {
            createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            try {
                ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(0, this.listener);
                SortMergeResultPartition createSortMergedPartition2 = createSortMergedPartition(1, networkBufferPool.createBufferPool(i / 2, i));
                readAndEmitData(createSubpartitionView, createSortMergedPartition2);
                countDownLatch.countDown();
                countDownLatch2.await();
                readAndEmitAllData(createSubpartitionView, createSortMergedPartition2);
            } catch (Exception e) {
            }
        });
        thread.start();
        Thread thread2 = new Thread(() -> {
            try {
                countDownLatch.await();
                BufferPool createBufferPool = networkBufferPool.createBufferPool(i / 2, i);
                countDownLatch2.countDown();
                readAndEmitAllData(createSortMergedPartition.createSubpartitionView(0, this.listener), createSortMergedPartition(1, createBufferPool));
            } catch (Exception e) {
            }
        });
        thread2.start();
        thread.join();
        thread2.join();
    }

    private boolean readAndEmitData(ResultSubpartitionView resultSubpartitionView, SortMergeResultPartition sortMergeResultPartition) throws Exception {
        ResultSubpartition.BufferAndBacklog nextBuffer;
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(1024);
        do {
            nextBuffer = resultSubpartitionView.getNextBuffer();
        } while (nextBuffer == null);
        Buffer fullBufferData = nextBuffer.buffer().getFullBufferData(allocateUnpooledSegment);
        sortMergeResultPartition.emitRecord(fullBufferData.getNioBufferReadable(), 0);
        if (!fullBufferData.isRecycled()) {
            fullBufferData.recycleBuffer();
        }
        return nextBuffer.buffer().isBuffer();
    }

    private void readAndEmitAllData(ResultSubpartitionView resultSubpartitionView, SortMergeResultPartition sortMergeResultPartition) throws Exception {
        do {
        } while (readAndEmitData(resultSubpartitionView, sortMergeResultPartition));
        sortMergeResultPartition.finish();
        sortMergeResultPartition.close();
    }

    private void testResultPartitionBytesCounter(boolean z) throws IOException {
        int i = this.useHashDataBuffer ? 100 : 15;
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(2, this.globalPool.createBufferPool(i, i));
        if (z) {
            createSortMergedPartition.broadcastRecord(ByteBuffer.allocate(1024));
            createSortMergedPartition.finish();
            Assertions.assertThat(createSortMergedPartition.resultPartitionBytes.createSnapshot().getSubpartitionBytes()).containsExactly(new long[]{1028, 1028});
            Assertions.assertThat(createSortMergedPartition.numBytesOut.getCount()).isEqualTo(2 * 1028);
            return;
        }
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(2048), 1);
        createSortMergedPartition.finish();
        Assertions.assertThat(createSortMergedPartition.resultPartitionBytes.createSnapshot().getSubpartitionBytes()).containsExactly(new long[]{1028, 2052});
        Assertions.assertThat(createSortMergedPartition.numBytesOut.getCount()).isEqualTo(3072 + (2 * 4));
    }

    private SortMergeResultPartition createSortMergedPartition(int i, BufferPool bufferPool) throws IOException {
        return createSortMergedPartition(i, bufferPool, this.readBufferPool);
    }

    private SortMergeResultPartition createSortMergedPartition(int i, BufferPool bufferPool, BatchShuffleReadBufferPool batchShuffleReadBufferPool) throws IOException {
        SortMergeResultPartition sortMergeResultPartition = new SortMergeResultPartition("SortMergedResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.BLOCKING, i, i, batchShuffleReadBufferPool, this.readIOExecutor, new ResultPartitionManager(), this.fileChannelManager.createChannel().getPath(), (BufferCompressor) null, () -> {
            return bufferPool;
        });
        sortMergeResultPartition.setup();
        return sortMergeResultPartition;
    }
}
