package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.SortBuffer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageSortBufferTest.class */
class TieredStorageSortBufferTest {
    private static final int BUFFER_SIZE_BYTES = 1024;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageSortBufferTest$DataAndType.class */
    public static class DataAndType {
        private final ByteBuffer data;
        private final Buffer.DataType dataType;

        DataAndType(ByteBuffer byteBuffer, Buffer.DataType dataType) {
            this.data = byteBuffer;
            this.dataType = dataType;
        }
    }

    TieredStorageSortBufferTest() {
    }

    @Test
    void testWriteAndReadDataBuffer() throws Exception {
        BufferWithChannel copyIntoSegment;
        Random random = new Random(1234L);
        Queue[] queueArr = new Queue[10];
        Queue[] queueArr2 = new Queue[10];
        for (int i = 0; i < 10; i++) {
            queueArr[i] = new ArrayDeque();
            queueArr2[i] = new ArrayDeque();
        }
        int[] iArr = new int[10];
        int[] iArr2 = new int[10];
        Arrays.fill(iArr, 0);
        Arrays.fill(iArr2, 0);
        SortBuffer createDataBuffer = createDataBuffer(512, 10);
        int i2 = 5;
        while (i2 > 0) {
            byte[] bArr = new byte[random.nextInt(4095) + 1];
            random.nextBytes(bArr);
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            int nextInt = random.nextInt(10);
            Buffer.DataType dataType = random.nextBoolean() ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
            boolean append = createDataBuffer.append(wrap, nextInt, dataType);
            wrap.flip();
            if (wrap.hasRemaining()) {
                queueArr[nextInt].add(new DataAndType(wrap, dataType));
                iArr[nextInt] = iArr[nextInt] + wrap.remaining();
            }
            if (!append) {
                createDataBuffer.finish();
                i2--;
                while (createDataBuffer.hasRemaining() && (copyIntoSegment = copyIntoSegment(createDataBuffer)) != null) {
                    addBufferRead(copyIntoSegment, queueArr2, iArr2);
                }
                createDataBuffer = createDataBuffer(512, 10);
            }
        }
        if (createDataBuffer.hasRemaining()) {
            createDataBuffer.finish();
            while (createDataBuffer.hasRemaining()) {
                addBufferRead(copyIntoSegment(createDataBuffer), queueArr2, iArr2);
            }
        }
        checkWriteReadResult(10, iArr, iArr2, queueArr, queueArr2);
    }

    @Test
    void testBufferIsRecycledWhenSortBufferIsEmpty() throws Exception {
        BufferPool createBufferPool = new NetworkBufferPool(512, 1024).createBufferPool(512, 512);
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < 20; i++) {
            linkedList.add(createBufferPool.requestMemorySegmentBlocking());
        }
        TieredStorageSortBuffer tieredStorageSortBuffer = new TieredStorageSortBuffer(linkedList, createBufferPool, 10, 1024, 20);
        MemorySegment memorySegment = (MemorySegment) linkedList.poll();
        tieredStorageSortBuffer.finish();
        Assertions.assertThat(tieredStorageSortBuffer.getNextBuffer(memorySegment)).isNull();
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(20);
    }

    @Test
    void testBufferIsRecycledWhenGetEvent() throws Exception {
        Random random = new Random(1234L);
        BufferPool createBufferPool = new NetworkBufferPool(512, 1024).createBufferPool(512, 512);
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < 20; i++) {
            linkedList.add(createBufferPool.requestMemorySegmentBlocking());
        }
        TieredStorageSortBuffer tieredStorageSortBuffer = new TieredStorageSortBuffer(linkedList, createBufferPool, 10, 1024, 20);
        byte[] bArr = new byte[1];
        random.nextBytes(bArr);
        tieredStorageSortBuffer.append(ByteBuffer.wrap(bArr), 0, Buffer.DataType.DATA_BUFFER);
        tieredStorageSortBuffer.append(ByteBuffer.wrap(bArr), 0, Buffer.DataType.EVENT_BUFFER);
        tieredStorageSortBuffer.finish();
        MemorySegment requestMemorySegmentBlocking = createBufferPool.requestMemorySegmentBlocking();
        BufferWithChannel nextBuffer = tieredStorageSortBuffer.getNextBuffer(requestMemorySegmentBlocking);
        Assertions.assertThat(nextBuffer.getBuffer().isBuffer()).isTrue();
        Assertions.assertThat(nextBuffer.getChannelIndex()).isEqualTo(0);
        nextBuffer.getBuffer().recycleBuffer();
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(20);
        BufferWithChannel nextBuffer2 = tieredStorageSortBuffer.getNextBuffer(requestMemorySegmentBlocking);
        Assertions.assertThat(nextBuffer2.getBuffer().isBuffer()).isFalse();
        Assertions.assertThat(nextBuffer2.getChannelIndex()).isEqualTo(0);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(20);
    }

    private static BufferWithChannel copyIntoSegment(SortBuffer sortBuffer) {
        return sortBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024));
    }

    private static void addBufferRead(BufferWithChannel bufferWithChannel, Queue<Buffer>[] queueArr, int[] iArr) {
        int channelIndex = bufferWithChannel.getChannelIndex();
        Buffer buffer = bufferWithChannel.getBuffer();
        queueArr[channelIndex].add(new NetworkBuffer(buffer.getMemorySegment(), (v0) -> {
            v0.free();
        }, buffer.getDataType(), buffer.getSize()));
        iArr[channelIndex] = iArr[channelIndex] + buffer.getSize();
    }

    private static void checkWriteReadResult(int i, int[] iArr, int[] iArr2, Queue<DataAndType>[] queueArr, Queue<Buffer>[] queueArr2) {
        for (int i2 = 0; i2 < i; i2++) {
            Assertions.assertThat(iArr[i2]).isEqualTo(iArr2[i2]);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ByteBuffer allocate = ByteBuffer.allocate(iArr[i2]);
            for (DataAndType dataAndType : queueArr[i2]) {
                allocate.put(dataAndType.data);
                dataAndType.data.rewind();
                if (dataAndType.dataType.isEvent()) {
                    arrayList.add(dataAndType);
                }
            }
            ByteBuffer allocate2 = ByteBuffer.allocate(iArr2[i2]);
            for (Buffer buffer : queueArr2[i2]) {
                allocate2.put(buffer.getNioBufferReadable());
                if (!buffer.isBuffer()) {
                    arrayList2.add(buffer);
                }
            }
            allocate.flip();
            allocate2.flip();
            Assertions.assertThat(allocate).isEqualTo(allocate2);
            Assertions.assertThat(arrayList.size()).isEqualTo(arrayList2.size());
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                Assertions.assertThat(((DataAndType) arrayList.get(i3)).dataType).isEqualTo(((Buffer) arrayList2.get(i3)).getDataType());
                Assertions.assertThat(((DataAndType) arrayList.get(i3)).data).isEqualTo(((Buffer) arrayList2.get(i3)).getNioBufferReadable());
            }
        }
    }

    private static TieredStorageSortBuffer createDataBuffer(int i, int i2) throws Exception {
        BufferPool createBufferPool = new NetworkBufferPool(i, 1024).createBufferPool(i, i);
        LinkedList linkedList = new LinkedList();
        for (int i3 = 0; i3 < i; i3++) {
            linkedList.add(createBufferPool.requestMemorySegmentBlocking());
        }
        return new TieredStorageSortBuffer(linkedList, createBufferPool, i2, 1024, i);
    }
}
