package org.apache.hudi.common.functional;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/common/functional/TestHoodieLogFormat.class */
public class TestHoodieLogFormat extends HoodieCommonTestHarness {
    private static String BASE_OUTPUT_PATH = "/tmp/";
    private FileSystem fs;
    private Path partitionPath;
    private int bufferSize = 4096;
    private HoodieLogBlock.HoodieLogBlockType dataBlockType = HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.common.functional.TestHoodieLogFormat$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/common/functional/TestHoodieLogFormat$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType = new int[HoodieLogBlock.HoodieLogBlockType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @BeforeAll
    public static void setUpClass() throws IOException, InterruptedException {
        MiniClusterUtil.setUp();
    }

    @AfterAll
    public static void tearDownClass() {
        MiniClusterUtil.shutdown();
    }

    @BeforeEach
    public void setUp() throws IOException, InterruptedException {
        this.fs = MiniClusterUtil.fileSystem;
        Assertions.assertTrue(this.fs.mkdirs(new Path(this.tempDir.toAbsolutePath().toString())));
        this.partitionPath = new Path(this.tempDir.toAbsolutePath().toString());
        this.basePath = this.tempDir.getParent().toString();
        HoodieTestUtils.init(MiniClusterUtil.configuration, this.basePath, HoodieTableType.MERGE_ON_READ);
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.fs.delete(this.partitionPath, true);
    }

    @Test
    public void testEmptyLog() throws IOException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Assertions.assertEquals(0L, build.getCurrentSize(), "Just created this log, size should be 0");
        Assertions.assertTrue(build.getLogFile().getFileName().startsWith("."), "Check all log files should start with a .");
        Assertions.assertEquals(1, build.getLogFile().getLogVersion(), "Version should be 1 for new log created");
    }

    @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK"})
    @ParameterizedTest
    public void testBasicAppend(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType) throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.getCurrentSize();
        AppendResult appendBlock = build.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords, hashMap));
        long currentSize = build.getCurrentSize();
        Assertions.assertTrue(currentSize > 0, "We just wrote a block - size should be > 0");
        Assertions.assertEquals(currentSize, this.fs.getFileStatus(build.getLogFile().getPath()).getLen(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        Assertions.assertEquals(currentSize, appendBlock.size());
        Assertions.assertEquals(build.getLogFile(), appendBlock.logFile());
        Assertions.assertEquals(0L, appendBlock.offset());
        build.close();
    }

    @Test
    public void testRollover() throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        AppendResult appendBlock = build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        long currentSize = build.getCurrentSize();
        build.close();
        Assertions.assertEquals(0L, appendBlock.offset());
        Assertions.assertEquals(currentSize, appendBlock.size());
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).withSizeThreshold(currentSize - 1).build();
        AppendResult appendBlock2 = build2.appendBlock(getDataBlock(SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        Assertions.assertEquals(appendBlock.logFile(), appendBlock2.logFile());
        Assertions.assertNotEquals(0L, appendBlock2.offset());
        Assertions.assertEquals(0L, build2.getCurrentSize(), "This should be a new log file and hence size should be 0");
        Assertions.assertEquals(2, build2.getLogFile().getLogVersion(), "Version should be rolled to 2");
        Path path = build2.getLogFile().getPath();
        Assertions.assertFalse(this.fs.exists(path), "Path (" + path + ") must not exist");
        AppendResult appendBlock3 = build2.appendBlock(getDataBlock(SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        Assertions.assertNotEquals(appendBlock2.logFile(), appendBlock3.logFile());
        Assertions.assertEquals(0L, appendBlock3.offset());
        build2.close();
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithoutWriteToken() throws Exception {
        testConcurrentAppend(true, false);
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithWriteToken() throws Exception {
        testConcurrentAppend(true, true);
    }

    @Test
    public void testConcurrentAppendOnFirstLogFileVersion() throws Exception {
        testConcurrentAppend(false, true);
    }

    private void testConcurrentAppend(boolean z, boolean z2) throws Exception {
        HoodieLogFormat.WriterBuilder withRolloverLogWriteToken;
        HoodieLogFormat.WriterBuilder withFs = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs);
        HoodieLogFormat.WriterBuilder withFs2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs);
        if (z2 && z) {
            withRolloverLogWriteToken = withFs.withLogVersion(1).withRolloverLogWriteToken(HoodieTestUtils.DEFAULT_WRITE_TOKEN);
            withFs2 = withFs2.withLogVersion(1).withRolloverLogWriteToken(HoodieTestUtils.DEFAULT_WRITE_TOKEN);
        } else if (z2) {
            withRolloverLogWriteToken = withFs.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withRolloverLogWriteToken(HoodieTestUtils.DEFAULT_WRITE_TOKEN);
            withFs2 = withFs2.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withRolloverLogWriteToken(HoodieTestUtils.DEFAULT_WRITE_TOKEN);
        } else {
            withRolloverLogWriteToken = withFs.withLogVersion(1).withRolloverLogWriteToken(HoodieTestUtils.DEFAULT_WRITE_TOKEN);
        }
        HoodieLogFormat.Writer build = withRolloverLogWriteToken.build();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = getDataBlock(generateTestRecords, hashMap);
        build.appendBlock(dataBlock);
        HoodieLogFormat.Writer build2 = withFs2.build();
        build2.appendBlock(dataBlock);
        HoodieLogFile logFile = build.getLogFile();
        HoodieLogFile logFile2 = build2.getLogFile();
        build.close();
        build2.close();
        Assertions.assertNotNull(logFile.getLogWriteToken());
        Assertions.assertEquals(logFile.getLogVersion(), logFile2.getLogVersion() - 1, "Log Files must have different versions");
    }

    @Test
    public void testMultipleAppend() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        long currentSize = build.getCurrentSize();
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build2.appendBlock(getDataBlock(generateTestRecords2, hashMap));
        long currentSize2 = build2.getCurrentSize();
        Assertions.assertTrue(currentSize2 > currentSize, "We just wrote a new block - size2 should be > size1");
        Assertions.assertEquals(currentSize2, this.fs.getFileStatus(build2.getLogFile().getPath()).getLen(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords3 = SchemaTestUtil.generateTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build3.appendBlock(getDataBlock(generateTestRecords3, hashMap));
        long currentSize3 = build3.getCurrentSize();
        Assertions.assertTrue(currentSize3 > currentSize2, "We just wrote a new block - size3 should be > size2");
        Assertions.assertEquals(currentSize3, this.fs.getFileStatus(build3.getLogFile().getPath()).getLen(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        build3.close();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            build3.getCurrentSize();
        }, "getCurrentSize should fail after the logAppender is closed");
    }

    @Test
    public void testAppendNotSupported() throws IOException, URISyntaxException, InterruptedException {
        Path path = new Path("file://" + this.partitionPath);
        FileSystem fs = FSUtils.getFs(path.toString(), HoodieTestUtils.getDefaultHadoopConf());
        Path path2 = new Path(path, "append_test");
        fs.mkdirs(path2);
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = getDataBlock(generateTestRecords, hashMap);
        for (int i = 0; i < 2; i++) {
            HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(path2).withFileExtension(".archive").withFileId("commits.archive").overBaseCommit("").withFs(fs).build();
            build.appendBlock(dataBlock);
            build.close();
        }
        Assertions.assertEquals(2, fs.listStatus(path2).length);
    }

    @Test
    public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(this.fs, build.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "We wrote a block, we should be able to read it");
        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
        Assertions.assertEquals(this.dataBlockType, hoodieDataBlock.getBlockType(), "The next block should be a data block");
        HoodieDataBlock hoodieDataBlock2 = hoodieDataBlock;
        Assertions.assertEquals(list.size(), hoodieDataBlock2.getRecords().size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list, hoodieDataBlock2.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        newReader.close();
    }

    @Test
    public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 100);
        List list2 = (List) generateTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, simpleSchema);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build2.appendBlock(getDataBlock(generateTestRecords2, hashMap));
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords3 = SchemaTestUtil.generateTestRecords(0, 100);
        List list3 = (List) generateTestRecords3.stream().map(indexedRecord3 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord3, simpleSchema);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build3.appendBlock(getDataBlock(generateTestRecords3, hashMap));
        build3.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(this.fs, build3.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "First block should be available");
        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
        Assertions.assertEquals(list.size(), hoodieDataBlock.getRecords().size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list, hoodieDataBlock.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertEquals(hoodieDataBlock.getSchema(), SchemaTestUtil.getSimpleSchema());
        newReader.hasNext();
        HoodieDataBlock hoodieDataBlock2 = (HoodieLogBlock) newReader.next();
        Assertions.assertEquals(list2.size(), hoodieDataBlock2.getRecords().size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list2, hoodieDataBlock2.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        newReader.hasNext();
        HoodieDataBlock hoodieDataBlock3 = (HoodieLogBlock) newReader.next();
        Assertions.assertEquals(list3.size(), hoodieDataBlock3.getRecords().size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list3, hoodieDataBlock3.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        newReader.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withSizeThreshold(1024L).withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        while (build.getLogFile().getLogVersion() != 4) {
            hashSet.add(build.getLogFile());
            List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
            arrayList.add((List) generateHoodieTestRecords.stream().map(indexedRecord -> {
                return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
            }).collect(Collectors.toList()));
            hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
            build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        }
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths((List) hashSet.stream().map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = build2.iterator();
        while (it.hasNext()) {
            arrayList2.add((IndexedRecord) ((HoodieRecord) it.next()).getData().getInsertValue(addMetadataFields).get());
        }
        Assertions.assertEquals(arrayList2.size(), arrayList.stream().mapToLong((v0) -> {
            return v0.size();
        }).sum(), "Scanner records count should be the same as appended records");
    }

    @Test
    public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        build.close();
        this.fs = FSUtils.getFs(this.fs.getUri().toString(), this.fs.getConf());
        FSDataOutputStream append = this.fs.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(474L);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1);
        append.writeLong(400L);
        append.write("something-random".getBytes());
        append.flush();
        append.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 10);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build2.appendBlock(getDataBlock(generateTestRecords2, hashMap));
        build2.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(this.fs, build2.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "First block should be available");
        newReader.next();
        Assertions.assertTrue(newReader.hasNext(), "We should have corrupted block next");
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, ((HoodieLogBlock) newReader.next()).getBlockType(), "The read block should be a corrupt block");
        Assertions.assertTrue(newReader.hasNext(), "Third block should be available");
        newReader.next();
        Assertions.assertFalse(newReader.hasNext(), "There should be no more block left");
        newReader.close();
        FSDataOutputStream append2 = this.fs.append(build2.getLogFile().getPath());
        append2.write(HoodieLogFormat.MAGIC);
        append2.writeLong(1000L);
        append2.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append2.writeInt(1);
        append2.writeLong(500L);
        append2.write("something-else-random".getBytes());
        append2.flush();
        append2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords3 = SchemaTestUtil.generateTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build3.appendBlock(getDataBlock(generateTestRecords3, hashMap));
        build3.close();
        HoodieLogFormat.Reader newReader2 = HoodieLogFormat.newReader(this.fs, build3.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader2.hasNext(), "First block should be available");
        newReader2.next();
        Assertions.assertTrue(newReader2.hasNext(), "We should get the 1st corrupted block next");
        newReader2.next();
        Assertions.assertTrue(newReader2.hasNext(), "Third block should be available");
        newReader2.next();
        Assertions.assertTrue(newReader2.hasNext(), "We should get the 2nd corrupted block next");
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, ((HoodieLogBlock) newReader2.next()).getBlockType(), "The read block should be a corrupt block");
        Assertions.assertTrue(newReader2.hasNext(), "We should get the last block next");
        newReader2.next();
        Assertions.assertFalse(newReader2.hasNext(), "We should have no more blocks left");
        newReader2.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).withSizeThreshold(500L).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        List<IndexedRecord> generateHoodieTestRecords2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords2, hashMap));
        build.close();
        List list3 = (List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths(list3).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
        Assertions.assertEquals(200L, build2.getTotalLogRecords());
        HashSet hashSet = new HashSet(200);
        build2.forEach(hoodieRecord -> {
            hashSet.add(hoodieRecord.getKey().getRecordKey());
        });
        Assertions.assertEquals(200, hashSet.size(), "Stream collect should return all 200 records");
        list.addAll(list2);
        Assertions.assertEquals((Set) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get("_hoodie_record_key").toString();
        }).collect(Collectors.toSet()), hashSet, "CompositeAvroLogReader should return 200 records from 2 versions");
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List<IndexedRecord> generateHoodieTestRecords2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords2, hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        List<IndexedRecord> generateHoodieTestRecords3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords3.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords3, hashMap));
        build.close();
        List list3 = (List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        FileCreateUtils.createDeltaCommit(this.basePath, "102", this.fs);
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths(list3).withReaderSchema(addMetadataFields).withLatestInstantTime("102").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
        Assertions.assertEquals(200L, build2.getTotalLogRecords(), "We read 200 records from 2 write batches");
        HashSet hashSet = new HashSet(200);
        build2.forEach(hoodieRecord -> {
            hashSet.add(hoodieRecord.getKey().getRecordKey());
        });
        Assertions.assertEquals(200, hashSet.size(), "Stream collect should return all 200 records");
        list.addAll(list2);
        Assertions.assertEquals((Set) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get("_hoodie_record_key").toString();
        }).collect(Collectors.toSet()), hashSet, "CompositeAvroLogReader should return 200 records from 2 versions");
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType diskMapType, boolean z) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        build.close();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        this.fs = FSUtils.getFs(this.fs.getUri().toString(), this.fs.getConf());
        FSDataOutputStream append = this.fs.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(1000L);
        append.writeInt(1);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.write(HoodieLogBlock.getLogMetadataBytes(hashMap));
        append.writeLong("something-random".getBytes().length);
        append.write("something-random".getBytes());
        append.flush();
        append.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        List<IndexedRecord> generateHoodieTestRecords2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(generateHoodieTestRecords2, hashMap));
        build2.close();
        List list3 = (List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        FileCreateUtils.createDeltaCommit(this.basePath, "103", this.fs);
        HoodieMergedLogRecordScanner build3 = HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths(list3).withReaderSchema(addMetadataFields).withLatestInstantTime("103").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(true).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
        Assertions.assertEquals(200L, build3.getTotalLogRecords(), "We would read 200 records");
        HashSet hashSet = new HashSet(200);
        build3.forEach(hoodieRecord -> {
            hashSet.add(hoodieRecord.getKey().getRecordKey());
        });
        Assertions.assertEquals(200, hashSet.size(), "Stream collect should return all 200 records");
        list.addAll(list2);
        Assertions.assertEquals((Set) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get("_hoodie_record_key").toString();
        }).collect(Collectors.toSet()), hashSet, "CompositeAvroLogReader should return 200 records from 2 versions");
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List<IndexedRecord> generateHoodieTestRecords2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        build.appendBlock(getDataBlock(generateHoodieTestRecords2, hashMap));
        list.addAll(list2);
        List list3 = (List) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get("_hoodie_record_key").toString();
        }).collect(Collectors.toList());
        List subList = ((List) list.stream().map(indexedRecord4 -> {
            return new HoodieKey(((GenericRecord) indexedRecord4).get("_hoodie_record_key").toString(), ((GenericRecord) indexedRecord4).get("_hoodie_partition_path").toString());
        }).collect(Collectors.toList())).subList(0, 50);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        build.appendBlock(new HoodieDeleteBlock((HoodieKey[]) subList.toArray(new HoodieKey[50]), hashMap));
        List list4 = (List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        FileCreateUtils.createDeltaCommit(this.basePath, "101", this.fs);
        FileCreateUtils.createDeltaCommit(this.basePath, "102", this.fs);
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths(list4).withReaderSchema(addMetadataFields).withLatestInstantTime("102").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
        Assertions.assertEquals(200L, build2.getTotalLogRecords(), "We still would read 200 records");
        ArrayList arrayList = new ArrayList(200);
        ArrayList arrayList2 = new ArrayList();
        build2.forEach(hoodieRecord -> {
            arrayList.add(hoodieRecord.getKey().getRecordKey());
        });
        build2.forEach(hoodieRecord2 -> {
            try {
                if (!hoodieRecord2.getData().getInsertValue(addMetadataFields).isPresent()) {
                    arrayList2.add(true);
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Assertions.assertEquals(200, arrayList.size(), "Stream collect should return all 200 records");
        Assertions.assertEquals(50, arrayList2.size(), "Stream collect should return all 50 records with empty payloads");
        list3.removeAll(subList);
        Collections.sort(list3);
        Collections.sort(arrayList);
        Assertions.assertEquals(list3, arrayList, "CompositeAvroLogReader should return 150 records from 2 versions");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        FileCreateUtils.deleteDeltaCommit(this.basePath, "102", this.fs);
        arrayList.clear();
        HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths(list4).withReaderSchema(addMetadataFields).withLatestInstantTime("101").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build().forEach(hoodieRecord3 -> {
            arrayList.add(hoodieRecord3.getKey().getRecordKey());
        });
        Assertions.assertEquals(200, arrayList.size(), "Stream collect should return all 200 records after rollback of delete");
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        List<IndexedRecord> generateHoodieTestRecords2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords2, hashMap));
        build.appendBlock(new HoodieDeleteBlock((HoodieKey[]) ((List) list.stream().map(indexedRecord2 -> {
            return new HoodieKey(((GenericRecord) indexedRecord2).get("_hoodie_record_key").toString(), ((GenericRecord) indexedRecord2).get("_hoodie_partition_path").toString());
        }).collect(Collectors.toList())).subList(0, 50).toArray(new HoodieKey[50]), hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock hoodieCommandBlock = new HoodieCommandBlock(hashMap);
        try {
            build.appendBlock(hoodieCommandBlock);
            throw new Exception("simulating failure");
        } catch (Exception e) {
            build.appendBlock(hoodieCommandBlock);
            HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
                return hoodieLogFile.getPath().toString();
            }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
            Assertions.assertEquals(0L, build2.getTotalLogRecords(), "We would have scanned 0 records because of rollback");
            ArrayList arrayList = new ArrayList();
            build2.forEach(hoodieRecord -> {
                arrayList.add(hoodieRecord.getKey().getRecordKey());
            });
            Assertions.assertEquals(0, arrayList.size(), "Stream collect should return all 0 records");
            FileCreateUtils.deleteDeltaCommit(this.basePath, "100", this.fs);
        }
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        build.appendBlock(new HoodieDeleteBlock((HoodieKey[]) ((List) list.stream().map(indexedRecord2 -> {
            return new HoodieKey(((GenericRecord) indexedRecord2).get("_hoodie_record_key").toString(), ((GenericRecord) indexedRecord2).get("_hoodie_partition_path").toString());
        }).collect(Collectors.toList())).subList(0, 50).toArray(new HoodieKey[50]), hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        HoodieCommandBlock hoodieCommandBlock = new HoodieCommandBlock(hashMap);
        build.appendBlock(hoodieCommandBlock);
        build.appendBlock(hoodieCommandBlock);
        Assertions.assertEquals(0L, HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build().getTotalLogRecords(), "We would read 0 records");
        FileCreateUtils.deleteDeltaCommit(this.basePath, "100", this.fs);
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(generateHoodieTestRecords, hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
        Assertions.assertEquals(100L, build2.getTotalLogRecords(), "We still would read 100 records");
        ArrayList arrayList = new ArrayList(100);
        build2.forEach(hoodieRecord -> {
            arrayList.add(hoodieRecord.getKey().getRecordKey());
        });
        Assertions.assertEquals(100, arrayList.size(), "Stream collect should return all 150 records");
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        HoodieDataBlock dataBlock = getDataBlock(generateHoodieTestRecords, hashMap);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.appendBlock(new HoodieDeleteBlock((HoodieKey[]) ((List) list.stream().map(indexedRecord2 -> {
            return new HoodieKey(((GenericRecord) indexedRecord2).get("_hoodie_record_key").toString(), ((GenericRecord) indexedRecord2).get("_hoodie_partition_path").toString());
        }).collect(Collectors.toList())).subList(0, 50).toArray(new HoodieKey[50]), hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        Assertions.assertEquals(0L, HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("101").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build().getTotalLogRecords(), "We would read 0 records");
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        HoodieDataBlock dataBlock = getDataBlock(generateHoodieTestRecords, hashMap);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        this.fs = FSUtils.getFs(this.fs.getUri().toString(), this.fs.getConf());
        FSDataOutputStream append = this.fs.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(1000L);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1);
        append.writeLong(100L);
        append.flush();
        append.close();
        this.fs = FSUtils.getFs(this.fs.getUri().toString(), this.fs.getConf());
        FSDataOutputStream append2 = this.fs.append(build.getLogFile().getPath());
        append2.write(HoodieLogFormat.MAGIC);
        append2.writeLong(1000L);
        append2.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append2.writeInt(1);
        append2.writeLong(100L);
        append2.flush();
        append2.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        build2.appendBlock(dataBlock);
        build2.close();
        this.fs = FSUtils.getFs(this.fs.getUri().toString(), this.fs.getConf());
        FSDataOutputStream append3 = this.fs.append(build2.getLogFile().getPath());
        append3.write(HoodieLogFormat.MAGIC);
        append3.writeLong(1000L);
        append3.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append3.writeInt(1);
        append3.writeLong(100L);
        append3.flush();
        append3.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
        build3.appendBlock(new HoodieCommandBlock(hashMap));
        build3.close();
        Assertions.assertEquals(0L, HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("101").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build().getTotalLogRecords(), "We would read 0 records");
        FileCreateUtils.deleteDeltaCommit(this.basePath, "100", this.fs);
    }

    private void testAvroLogRecordReaderMergingMultipleLogFiles(int i, int i2, ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        try {
            Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
            List<IndexedRecord> generateHoodieTestRecords = SchemaTestUtil.generateHoodieTestRecords(0, 101);
            ArrayList arrayList = new ArrayList(generateHoodieTestRecords);
            HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
            HashMap hashMap = new HashMap();
            hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
            build.appendBlock(getDataBlock(generateHoodieTestRecords.subList(0, i), hashMap));
            long currentSize = build.getCurrentSize();
            build.close();
            HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).withSizeThreshold(currentSize - 1).build();
            HashMap hashMap2 = new HashMap();
            hashMap2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            hashMap2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
            build2.appendBlock(getDataBlock(arrayList.subList(0, i2), hashMap2));
            build2.close();
            FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
            Assertions.assertEquals(Math.max(i, i2), HoodieMergedLogRecordScanner.newBuilder().withFileSystem(this.fs).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(this.fs, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
                return hoodieLogFile.getPath().toString();
            }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReadBlocksLazily(z2).withReverseReader(false).withBufferSize(this.bufferSize).withSpillableMapBasePath(BASE_OUTPUT_PATH).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build().getNumMergedRecordsInLog(), "We would read 100 records");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, diskMapType, z, z2);
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, diskMapType, z, z2);
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, diskMapType, z, z2);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testBasicAppendAndReadInReverse(boolean z) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, simpleSchema.toString());
        build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 100);
        List list2 = (List) generateTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, simpleSchema);
        }).collect(Collectors.toList());
        build2.appendBlock(getDataBlock(generateTestRecords2, hashMap));
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        List<IndexedRecord> generateTestRecords3 = SchemaTestUtil.generateTestRecords(0, 100);
        List list3 = (List) generateTestRecords3.stream().map(indexedRecord3 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord3, simpleSchema);
        }).collect(Collectors.toList());
        build3.appendBlock(getDataBlock(generateTestRecords3, hashMap));
        build3.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        HoodieLogFileReader hoodieLogFileReader = new HoodieLogFileReader(this.fs, build3.getLogFile(), SchemaTestUtil.getSimpleSchema(), this.bufferSize, z, true);
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Last block should be available");
        HoodieDataBlock prev = hoodieLogFileReader.prev();
        Assertions.assertEquals(list3.size(), prev.getRecords().size(), "Third records size should be equal to the written records size");
        Assertions.assertEquals(list3, prev.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Second block should be available");
        HoodieDataBlock prev2 = hoodieLogFileReader.prev();
        Assertions.assertEquals(list2.size(), prev2.getRecords().size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list2, prev2.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "First block should be available");
        HoodieDataBlock prev3 = hoodieLogFileReader.prev();
        Assertions.assertEquals(list.size(), prev3.getRecords().size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list, prev3.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertFalse(hoodieLogFileReader.hasPrev());
        hoodieLogFileReader.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAppendAndReadOnCorruptedLogInReverse(boolean z) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, simpleSchema.toString());
        build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        this.fs = FSUtils.getFs(this.fs.getUri().toString(), this.fs.getConf());
        FSDataOutputStream append = this.fs.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1000);
        append.writeInt(1);
        append.write(HoodieLogBlock.getLogMetadataBytes(hashMap));
        append.write("something-random".getBytes());
        append.flush();
        append.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        build2.appendBlock(getDataBlock(SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        build2.close();
        HoodieLogFileReader hoodieLogFileReader = new HoodieLogFileReader(this.fs, build2.getLogFile(), simpleSchema, this.bufferSize, z, true);
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Last block should be available");
        Assertions.assertTrue(hoodieLogFileReader.prev() instanceof HoodieDataBlock, "Last block should be datablock");
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Last block should be available");
        Assertions.assertThrows(CorruptedLogFileException.class, () -> {
            hoodieLogFileReader.prev();
        });
        hoodieLogFileReader.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testBasicAppendAndTraverseInReverse(boolean z) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, simpleSchema.toString());
        build.appendBlock(getDataBlock(generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        build2.appendBlock(getDataBlock(SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withFs(this.fs).build();
        build3.appendBlock(getDataBlock(SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        build3.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", this.fs);
        HoodieLogFileReader hoodieLogFileReader = new HoodieLogFileReader(this.fs, build3.getLogFile(), SchemaTestUtil.getSimpleSchema(), this.bufferSize, z, true);
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Third block should be available");
        hoodieLogFileReader.moveToPrev();
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Second block should be available");
        hoodieLogFileReader.moveToPrev();
        Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "First block should be available");
        HoodieDataBlock prev = hoodieLogFileReader.prev();
        Assertions.assertEquals(list.size(), prev.getRecords().size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list, prev.getRecords(), "Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertFalse(hoodieLogFileReader.hasPrev());
        hoodieLogFileReader.close();
    }

    @Test
    public void testV0Format() throws IOException, URISyntaxException {
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List<IndexedRecord> generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        ArrayList arrayList = new ArrayList(generateTestRecords);
        Assertions.assertEquals(100, generateTestRecords.size());
        Assertions.assertEquals(100, arrayList.size());
        byte[] bytes = new HoodieAvroDataBlock(generateTestRecords, simpleSchema).getBytes(simpleSchema);
        Assertions.assertTrue(bytes.length > 0);
        HoodieAvroDataBlock block = HoodieAvroDataBlock.getBlock(bytes, simpleSchema);
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, block.getBlockType());
        List records = block.getRecords();
        Assertions.assertEquals(records.size(), arrayList.size());
        for (int i = 0; i < arrayList.size(); i++) {
            Assertions.assertEquals(arrayList.get(i), records.get(i));
        }
        HoodieAvroDataBlock block2 = HoodieAvroDataBlock.getBlock(bytes, (Schema) null);
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, block2.getBlockType());
        List records2 = block2.getRecords();
        Assertions.assertEquals(records2.size(), arrayList.size());
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assertions.assertEquals(arrayList.get(i2), records2.get(i2));
        }
    }

    private HoodieDataBlock getDataBlock(List<IndexedRecord> list, Map<HoodieLogBlock.HeaderMetadataType, String> map) {
        return getDataBlock(this.dataBlockType, list, map);
    }

    private HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType, List<IndexedRecord> list, Map<HoodieLogBlock.HeaderMetadataType, String> map) {
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[hoodieLogBlockType.ordinal()]) {
            case HoodieTestUtils.DEFAULT_LOG_VERSION /* 1 */:
                return new HoodieAvroDataBlock(list, map);
            case 2:
                return new HoodieHFileDataBlock(list, map);
            default:
                throw new RuntimeException("Unknown data block type " + hoodieLogBlockType);
        }
    }

    private static Stream<Arguments> testArguments() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true})});
    }
}
