package org.apache.flink.runtime.fs.hdfs;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.Random;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/fs/hdfs/AbstractHadoopRecoverableWriterITCase.class */
public abstract class AbstractHadoopRecoverableWriterITCase {
    protected static Path basePath;
    private static FileSystem fileSystem;
    protected Path basePathForTest;
    private static final String testData1 = "THIS IS A TEST 1.";
    private static final String testData2 = "THIS IS A TEST 2.";
    private static final String testData3 = "THIS IS A TEST 3.";
    protected static final String BIG_CHUNK_DATA_PATTERN = "THIS IS A TEST 1.";
    protected static String bigDataChunk;

    @TempDir
    protected static File tempFolder;
    private static final String INIT_EMPTY_PERSIST = "EMPTY";
    private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
    private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
    private static final String FINAL_WITH_EXTRA_STATE = "FINAL";
    private static final Random RND = new Random();
    protected static boolean skipped = true;

    @AfterAll
    static void cleanUp() throws Exception {
        if (!skipped) {
            getFileSystem().delete(basePath, true);
        }
        FileSystem.initialize(new Configuration());
    }

    @BeforeEach
    void prepare() throws Exception {
        this.basePathForTest = new Path(basePath, StringUtils.getRandomString(RND, 16, 16, 'a', 'z'));
        cleanupLocalDir();
    }

    protected abstract String getLocalTmpDir() throws Exception;

    protected abstract String getIncompleteObjectName(RecoverableWriter.ResumeRecoverable resumeRecoverable);

    private void cleanupLocalDir() throws Exception {
        java.nio.file.Path path = Paths.get(getLocalTmpDir(), new String[0]);
        if (!Files.exists(path, new LinkOption[0])) {
            Files.createDirectory(path, new FileAttribute[0]);
            return;
        }
        Stream<java.nio.file.Path> list = Files.list(path);
        try {
            list.forEach(path2 -> {
                try {
                    Files.delete(path2);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            if (list != null) {
                list.close();
            }
        } catch (Throwable th) {
            if (list != null) {
                try {
                    list.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @AfterEach
    void cleanupAndCheckTmpCleanup() throws Exception {
        java.nio.file.Path path = Paths.get(getLocalTmpDir(), new String[0]);
        Assertions.assertThat(Files.exists(path, new LinkOption[0])).isTrue();
        Stream<java.nio.file.Path> list = Files.list(path);
        try {
            Assertions.assertThat(list).isEmpty();
            if (list != null) {
                list.close();
            }
            Files.delete(path);
            getFileSystem().delete(this.basePathForTest, true);
        } catch (Throwable th) {
            if (list != null) {
                try {
                    list.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected static FileSystem getFileSystem() throws Exception {
        if (fileSystem == null) {
            fileSystem = FileSystem.get(basePath.toUri());
        }
        return fileSystem;
    }

    @Test
    void testCloseWithNoData() throws Exception {
        getRecoverableWriter().open(new Path(this.basePathForTest, "part-0")).closeForCommit().commit();
    }

    @Test
    void testCommitAfterNormalClose() throws Exception {
        RecoverableWriter recoverableWriter = getRecoverableWriter();
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableFsDataOutputStream open = recoverableWriter.open(path);
        open.write(bytesOf("THIS IS A TEST 1."));
        open.closeForCommit().commit();
        Assertions.assertThat(getContentsOfFile(path)).isEqualTo("THIS IS A TEST 1.");
    }

    @Test
    void testCommitAfterPersist() throws Exception {
        RecoverableWriter recoverableWriter = getRecoverableWriter();
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableFsDataOutputStream open = recoverableWriter.open(path);
        open.write(bytesOf("THIS IS A TEST 1."));
        open.persist();
        open.write(bytesOf(testData2));
        open.closeForCommit().commit();
        Assertions.assertThat(getContentsOfFile(path)).isEqualTo("THIS IS A TEST 1.THIS IS A TEST 2.");
    }

    @Test
    void testCleanupRecoverableState() throws Exception {
        RecoverableWriter recoverableWriter = getRecoverableWriter();
        RecoverableFsDataOutputStream open = recoverableWriter.open(new Path(this.basePathForTest, "part-0"));
        open.write(bytesOf("THIS IS A TEST 1."));
        RecoverableWriter.ResumeRecoverable persist = open.persist();
        open.closeForCommit().commit();
        Assertions.assertThat(getContentsOfFile(new Path("/" + getIncompleteObjectName(persist)))).isEqualTo("THIS IS A TEST 1.");
        Assertions.assertThat(recoverableWriter.cleanupRecoverableState(persist)).isTrue();
        Assertions.assertThatThrownBy(() -> {
            int i = 10;
            while (i > 0) {
                getContentsOfFile(new Path("/" + getIncompleteObjectName(persist)));
                i--;
                Thread.sleep(1000L);
            }
        }).isInstanceOf(FileNotFoundException.class);
    }

    @Test
    void testCallingDeleteObjectTwiceDoesNotThroughException() throws Exception {
        RecoverableWriter recoverableWriter = getRecoverableWriter();
        RecoverableFsDataOutputStream open = recoverableWriter.open(new Path(this.basePathForTest, "part-0"));
        open.write(bytesOf("THIS IS A TEST 1."));
        RecoverableWriter.ResumeRecoverable persist = open.persist();
        open.closeForCommit().commit();
        Assertions.assertThat(getContentsOfFile(new Path("/" + getIncompleteObjectName(persist)))).isEqualTo("THIS IS A TEST 1.");
        Assertions.assertThat(recoverableWriter.cleanupRecoverableState(persist)).isTrue();
        Assertions.assertThat(recoverableWriter.cleanupRecoverableState(persist)).isFalse();
    }

    @Test
    void testCommitAfterRecovery() throws Exception {
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableWriter recoverableWriter = getRecoverableWriter();
        RecoverableFsDataOutputStream open = recoverableWriter.open(path);
        open.write(bytesOf("THIS IS A TEST 1."));
        open.persist();
        open.persist();
        open.write(bytesOf(testData2));
        byte[] serialize = recoverableWriter.getCommitRecoverableSerializer().serialize(open.closeForCommit().getRecoverable());
        RecoverableWriter recoverableWriter2 = getRecoverableWriter();
        SimpleVersionedSerializer commitRecoverableSerializer = recoverableWriter2.getCommitRecoverableSerializer();
        recoverableWriter2.recoverForCommit((RecoverableWriter.CommitRecoverable) commitRecoverableSerializer.deserialize(commitRecoverableSerializer.getVersion(), serialize)).commitAfterRecovery();
        Assertions.assertThat(getContentsOfFile(path)).isEqualTo("THIS IS A TEST 1.THIS IS A TEST 2.");
    }

    @Test
    void testRecoverWithEmptyState() throws Exception {
        testResumeAfterMultiplePersistWithSmallData(INIT_EMPTY_PERSIST, testData3);
    }

    @Test
    void testRecoverWithState() throws Exception {
        testResumeAfterMultiplePersistWithSmallData(INTERM_WITH_STATE_PERSIST, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    void testRecoverFromIntermWithoutAdditionalState() throws Exception {
        testResumeAfterMultiplePersistWithSmallData(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    void testRecoverAfterMultiplePersistsState() throws Exception {
        testResumeAfterMultiplePersistWithSmallData(FINAL_WITH_EXTRA_STATE, "THIS IS A TEST 1.THIS IS A TEST 2.THIS IS A TEST 3.");
    }

    @Test
    void testRecoverWithStateWithMultiPart() throws Exception {
        testResumeAfterMultiplePersistWithMultiPartUploads(INTERM_WITH_STATE_PERSIST, bigDataChunk + bigDataChunk);
    }

    @Test
    void testRecoverFromIntermWithoutAdditionalStateWithMultiPart() throws Exception {
        testResumeAfterMultiplePersistWithMultiPartUploads(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, bigDataChunk + bigDataChunk);
    }

    @Test
    void testRecoverAfterMultiplePersistsStateWithMultiPart() throws Exception {
        testResumeAfterMultiplePersistWithMultiPartUploads(FINAL_WITH_EXTRA_STATE, bigDataChunk + bigDataChunk + bigDataChunk);
    }

    private void testResumeAfterMultiplePersistWithSmallData(String str, String str2) throws Exception {
        testResumeAfterMultiplePersist(str, str2, "THIS IS A TEST 1.", testData2, testData3);
    }

    private void testResumeAfterMultiplePersistWithMultiPartUploads(String str, String str2) throws Exception {
        testResumeAfterMultiplePersist(str, str2, bigDataChunk, bigDataChunk, bigDataChunk);
    }

    private void testResumeAfterMultiplePersist(String str, String str2, String str3, String str4, String str5) throws Exception {
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableWriter recoverableWriter = getRecoverableWriter();
        HashMap hashMap = new HashMap(4);
        RecoverableFsDataOutputStream open = recoverableWriter.open(path);
        try {
            hashMap.put(INIT_EMPTY_PERSIST, open.persist());
            open.write(bytesOf(str3));
            hashMap.put(INTERM_WITH_STATE_PERSIST, open.persist());
            hashMap.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, open.persist());
            open.write(bytesOf(str4));
            hashMap.put(FINAL_WITH_EXTRA_STATE, open.persist());
            if (open != null) {
                open.close();
            }
            SimpleVersionedSerializer resumeRecoverableSerializer = recoverableWriter.getResumeRecoverableSerializer();
            byte[] serialize = resumeRecoverableSerializer.serialize((RecoverableWriter.ResumeRecoverable) hashMap.get(str));
            RecoverableWriter recoverableWriter2 = getRecoverableWriter();
            RecoverableFsDataOutputStream recover = recoverableWriter2.recover((RecoverableWriter.ResumeRecoverable) recoverableWriter2.getResumeRecoverableSerializer().deserialize(resumeRecoverableSerializer.getVersion(), serialize));
            recover.write(bytesOf(str5));
            recover.closeForCommit().commit();
            Assertions.assertThat(getContentsOfFile(path)).isEqualTo(str2);
        } catch (Throwable th) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected String getContentsOfFile(Path path) throws Exception {
        StringBuilder sb = new StringBuilder();
        FSDataInputStream open = getFileSystem().open(path);
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
            while (true) {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    sb.append(readLine);
                } finally {
                }
            }
            bufferedReader.close();
            if (open != null) {
                open.close();
            }
            return sb.toString();
        } catch (Throwable th) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected static String createBigDataChunk(String str, long j) {
        StringBuilder sb = new StringBuilder();
        int checkedDownCast = (MathUtils.checkedDownCast(j) / bytesOf(str).length) + 100;
        for (int i = 0; i < checkedDownCast; i++) {
            sb.append(str);
        }
        return sb.toString();
    }

    protected static byte[] bytesOf(String str) {
        return str.getBytes(StandardCharsets.UTF_8);
    }

    protected RecoverableWriter getRecoverableWriter() throws Exception {
        return getFileSystem().createRecoverableWriter();
    }
}
