package org.apache.flink.runtime.fs.hdfs;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.fs.viewfs.ViewFileSystemTestSetup;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.class */
class HadoopViewFileSystemTruncateTest {

    @TempDir
    static File tempFolder;
    private final FileSystemTestHelper fileSystemTestHelper = new FileSystemTestHelper("/tests");
    private static MiniDFSCluster hdfsCluster;
    private static FileSystem fHdfs;
    private static org.apache.flink.core.fs.FileSystem fSystem;
    private Configuration fsViewConf;
    private FileSystem fsTarget;
    private Path targetTestRoot;

    HadoopViewFileSystemTruncateTest() {
    }

    @BeforeAll
    static void testHadoopVersion() {
        Assumptions.assumeThat(HadoopUtils.isMinHadoopVersion(2, 7)).isTrue();
    }

    @BeforeAll
    static void verifyOS() {
        ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isWindows()).describedAs("HDFS cluster cannot be started on Windows without extensions.", new Object[0])).isFalse();
    }

    @BeforeAll
    static void createHDFS() throws Exception {
        File file = tempFolder;
        Configuration configuration = new Configuration();
        configuration.set("hdfs.minidfs.basedir", file.getAbsolutePath());
        hdfsCluster = new MiniDFSCluster.Builder(configuration).nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1)).build();
        hdfsCluster.waitClusterUp();
        fHdfs = hdfsCluster.getFileSystem(0);
    }

    @BeforeEach
    void setUp() throws Exception {
        this.fsTarget = fHdfs;
        this.targetTestRoot = this.fileSystemTestHelper.getAbsoluteTestRootPath(this.fsTarget);
        this.fsTarget.delete(this.targetTestRoot, true);
        this.fsTarget.mkdirs(this.targetTestRoot);
        this.fsViewConf = ViewFileSystemTestSetup.createConfig();
        setupMountPoints();
        fSystem = new HadoopFileSystem(FileSystem.get(FsConstants.VIEWFS_URI, this.fsViewConf));
    }

    private void setupMountPoints() {
        ConfigUtil.addLink(this.fsViewConf, new Path("/mountOnNn1").toString(), this.targetTestRoot.toUri());
    }

    @AfterAll
    static void shutdownCluster() {
        hdfsCluster.shutdown();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.fsTarget.delete(this.fileSystemTestHelper.getTestRootPath(this.fsTarget), true);
    }

    @Test
    void testViewFileSystemRecoverWorks() throws IOException {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(fSystem.getUri() + "mountOnNn1/test-1");
        RecoverableWriter createRecoverableWriter = fSystem.createRecoverableWriter();
        fSystem.createRecoverableWriter().recoverForCommit(createRecoverableWriter.recover(getOpenStreamToFileWithContent(createRecoverableWriter, path, "test_line").persist()).closeForCommit().getRecoverable()).commitAfterRecovery();
        verifyFileContent(path, "test_line");
    }

    private RecoverableFsDataOutputStream getOpenStreamToFileWithContent(RecoverableWriter recoverableWriter, org.apache.flink.core.fs.Path path, String str) throws IOException {
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        RecoverableFsDataOutputStream open = recoverableWriter.open(path);
        open.write(bytes);
        return open;
    }

    private static void verifyFileContent(org.apache.flink.core.fs.Path path, String str) throws IOException {
        FSDataInputStream open = fSystem.open(path);
        try {
            InputStreamReader inputStreamReader = new InputStreamReader((InputStream) open, StandardCharsets.UTF_8);
            try {
                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                try {
                    Assertions.assertThat(bufferedReader.readLine()).isEqualTo(str);
                    bufferedReader.close();
                    inputStreamReader.close();
                    if (open != null) {
                        open.close();
                    }
                } catch (Throwable th) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }
}
