package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.class */
class DiskCacheManagerTest {
    DiskCacheManagerTest() {
    }

    @Test
    void testAppend() {
        Random random = new Random();
        TestingTieredStorageMemoryManager build = new TestingTieredStorageMemoryManager.Builder().build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        DiskCacheManager diskCacheManager = new DiskCacheManager(TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), 1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, build, new TestingPartitionFileWriter.Builder().setWriteFunction((tieredStoragePartitionId, list) -> {
            Tuple2<Integer, Integer> numReceivedBuffersAndBytes = getNumReceivedBuffersAndBytes(list);
            atomicInteger.getAndAdd(((Integer) numReceivedBuffersAndBytes.f0).intValue());
            atomicInteger2.getAndAdd(((Integer) numReceivedBuffersAndBytes.f1).intValue());
            return FutureUtils.completedVoidFuture();
        }).build());
        int i = 0;
        for (int i2 = 0; i2 < 100; i2++) {
            int nextInt = random.nextInt(100) + 1;
            i += nextInt;
            diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(nextInt), 0);
        }
        Assertions.assertThat(diskCacheManager.getBufferIndex(0)).isEqualTo(100);
        diskCacheManager.close();
        Assertions.assertThat(atomicInteger).hasValue(100);
        Assertions.assertThat(atomicInteger2).hasValue(i);
    }

    @Test
    void testAppendEndOfSegmentEvent() throws IOException {
        TestingTieredStorageMemoryManager build = new TestingTieredStorageMemoryManager.Builder().build();
        ArrayList arrayList = new ArrayList();
        DiskCacheManager diskCacheManager = new DiskCacheManager(TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), 1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, build, new TestingPartitionFileWriter.Builder().setWriteFunction((tieredStoragePartitionId, list) -> {
            arrayList.addAll(list);
            return FutureUtils.completedVoidFuture();
        }).build());
        diskCacheManager.appendEndOfSegmentEvent(EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0);
        diskCacheManager.close();
        Assertions.assertThat(arrayList).hasSize(1);
        List segmentBufferContexts = ((PartitionFileWriter.SubpartitionBufferContext) arrayList.get(0)).getSegmentBufferContexts();
        Assertions.assertThat(segmentBufferContexts).hasSize(1);
        List bufferAndIndexes = ((PartitionFileWriter.SegmentBufferContext) segmentBufferContexts.get(0)).getBufferAndIndexes();
        Assertions.assertThat(bufferAndIndexes).hasSize(1);
        Buffer buffer = (Buffer) ((Tuple2) bufferAndIndexes.get(0)).f0;
        Assertions.assertThat(buffer.isBuffer()).isFalse();
        Assertions.assertThat(EventSerializer.fromSerializedEvent(buffer.readOnlySlice().getNioBufferReadable(), getClass().getClassLoader())).isInstanceOf(EndOfSegmentEvent.class);
    }

    @Test
    void testFlushWhenCachedBytesReachLimit() throws IOException {
        TestingTieredStorageMemoryManager build = new TestingTieredStorageMemoryManager.Builder().build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        DiskCacheManager diskCacheManager = new DiskCacheManager(TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), 1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, build, new TestingPartitionFileWriter.Builder().setWriteFunction((tieredStoragePartitionId, list) -> {
            atomicInteger.incrementAndGet();
            return FutureUtils.completedVoidFuture();
        }).build());
        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
        Assertions.assertThat(atomicInteger).hasValue(0);
        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1), 0);
        Assertions.assertThat(atomicInteger).hasValue(1);
        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
        Assertions.assertThat(atomicInteger).hasValue(1);
        diskCacheManager.appendEndOfSegmentEvent(EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0);
        Assertions.assertThat(atomicInteger).hasValue(2);
    }

    @Test
    void testRelease() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        new DiskCacheManager(TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), 1, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, new TestingTieredStorageMemoryManager.Builder().build(), new TestingPartitionFileWriter.Builder().setReleaseRunnable(() -> {
            atomicBoolean.set(true);
        }).build()).release();
        Assertions.assertThat(atomicBoolean).isTrue();
    }

    private Tuple2<Integer, Integer> getNumReceivedBuffersAndBytes(List<PartitionFileWriter.SubpartitionBufferContext> list) {
        int i = 0;
        int i2 = 0;
        Iterator<PartitionFileWriter.SubpartitionBufferContext> it = list.iterator();
        while (it.hasNext()) {
            for (PartitionFileWriter.SegmentBufferContext segmentBufferContext : it.next().getSegmentBufferContexts()) {
                i += segmentBufferContext.getBufferAndIndexes().size();
                i2 += segmentBufferContext.getBufferAndIndexes().stream().mapToInt(tuple2 -> {
                    return ((Buffer) tuple2.f0).readableBytes();
                }).sum();
            }
        }
        return new Tuple2<>(Integer.valueOf(i), Integer.valueOf(i2));
    }
}
