package io.trino.plugin.hive.rubix;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import com.qubole.rubix.core.CachingFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAdlFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAzureBlobFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoDistributedFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoGoogleHadoopFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoSecureAzureBlobFileSystem;
import com.qubole.rubix.spi.CacheConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.client.NodeVersion;
import io.trino.metadata.InternalNode;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.HdfsConfig;
import io.trino.plugin.hive.HdfsConfigurationInitializer;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HiveHdfsConfiguration;
import io.trino.plugin.hive.HiveTestUtils;
import io.trino.plugin.hive.authentication.HdfsAuthenticationConfig;
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.orc.OrcReaderConfig;
import io.trino.plugin.hive.rubix.RubixConfig;
import io.trino.plugin.hive.rubix.RubixModule;
import io.trino.plugin.hive.util.RetryDriver;
import io.trino.spi.Node;
import io.trino.testing.TestingConnectorSession;
import io.trino.testing.TestingNodeManager;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/plugin/hive/rubix/TestRubixCaching.class */
public class TestRubixCaching {
    private static final DataSize SMALL_FILE_SIZE = DataSize.of(1, DataSize.Unit.MEGABYTE);
    private static final DataSize LARGE_FILE_SIZE = DataSize.of(100, DataSize.Unit.MEGABYTE);
    private static final MBeanServer BEAN_SERVER = ManagementFactory.getPlatformMBeanServer();
    private Path tempDirectory;
    private org.apache.hadoop.fs.Path cacheStoragePath;
    private HdfsConfig config;
    private HdfsEnvironment.HdfsContext context;
    private RubixInitializer rubixInitializer;
    private RubixConfigurationInitializer rubixConfigInitializer;
    private FileSystem nonCachingFileSystem;
    private FileSystem cachingFileSystem;

    @BeforeClass
    public void setup() throws IOException {
        this.cacheStoragePath = getStoragePath("/");
        this.config = new HdfsConfig();
        this.context = new HdfsEnvironment.HdfsContext(TestingConnectorSession.builder().setPropertyMetadata(HiveTestUtils.getHiveSessionProperties(new HiveConfig(), new RubixEnabledConfig().setCacheEnabled(true), new OrcReaderConfig()).getSessionProperties()).build());
        this.nonCachingFileSystem = getNonCachingFileSystem();
    }

    @AfterMethod
    @BeforeMethod
    public void deinitializeRubix() {
        CachingFileSystem.deinitialize();
    }

    private FileSystem getNonCachingFileSystem() throws IOException {
        return new HdfsEnvironment(new HiveHdfsConfiguration(new HdfsConfigurationInitializer(this.config), ImmutableSet.of()), this.config, new NoHdfsAuthentication()).getFileSystem(this.context, this.cacheStoragePath);
    }

    private void initializeCachingFileSystem(RubixConfig rubixConfig) throws Exception {
        initializeRubix(rubixConfig);
        this.cachingFileSystem = getCachingFileSystem();
    }

    private void initializeRubix(RubixConfig rubixConfig) throws Exception {
        initializeRubix(rubixConfig, ImmutableList.of(new InternalNode("master", URI.create("http://" + InetAddress.getLocalHost().getHostAddress() + ":8080"), NodeVersion.UNKNOWN, true)));
    }

    private void initializeRubix(RubixConfig rubixConfig, List<Node> list) throws Exception {
        this.tempDirectory = Files.createTempDirectory(getClass().getSimpleName(), new FileAttribute[0]);
        ImmutableList of = ImmutableList.of(this.tempDirectory.resolve("cache1"), this.tempDirectory.resolve("cache2"));
        Iterator it = of.iterator();
        while (it.hasNext()) {
            Files.createDirectories((Path) it.next(), new FileAttribute[0]);
        }
        rubixConfig.setStartServerOnCoordinator(true);
        rubixConfig.setCacheLocation(Joiner.on(",").join((Iterable) of.stream().map((v0) -> {
            return v0.toString();
        }).collect(ImmutableList.toImmutableList())));
        this.rubixInitializer = new RubixInitializer(rubixConfig, new TestingNodeManager(list), new CatalogName("catalog"), new HdfsConfigurationInitializer(this.config, ImmutableSet.of(configuration -> {
            CacheConfig.setRemoteFetchProcessInterval(configuration, 0);
        })), new RubixModule.DefaultRubixHdfsInitializer(new HdfsAuthenticationConfig()));
        this.rubixConfigInitializer = new RubixConfigurationInitializer(this.rubixInitializer);
        this.rubixInitializer.initializeRubix();
        RetryDriver.retry().run("wait for rubix to startup", () -> {
            if (this.rubixInitializer.isServerUp()) {
                return null;
            }
            throw new IllegalStateException("Rubix server has not started");
        });
    }

    private FileSystem getCachingFileSystem() throws IOException {
        return getCachingFileSystem(this.context, this.cacheStoragePath);
    }

    private FileSystem getCachingFileSystem(HdfsEnvironment.HdfsContext hdfsContext, org.apache.hadoop.fs.Path path) throws IOException {
        return new HdfsEnvironment(new HiveHdfsConfiguration(new HdfsConfigurationInitializer(this.config, ImmutableSet.of()), ImmutableSet.of(this.rubixConfigInitializer, (configuration, hdfsContext2, uri) -> {
            configuration.set("fs.file.impl", CachingLocalFileSystem.class.getName());
            configuration.setBoolean("fs.gs.lazy.init.enable", true);
            configuration.set("fs.azure.account.key", "Zm9vCg==");
            configuration.set("fs.adl.oauth2.client.id", "test");
            configuration.set("fs.adl.oauth2.refresh.url", "http://localhost");
            configuration.set("fs.adl.oauth2.credential", "password");
        })), this.config, new NoHdfsAuthentication()).getFileSystem(hdfsContext, path);
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() throws IOException {
        this.nonCachingFileSystem.close();
    }

    @AfterMethod(alwaysRun = true)
    public void closeRubix() throws IOException {
        Closer create = Closer.create();
        try {
            create.register(() -> {
                if (this.tempDirectory != null) {
                    MoreFiles.deleteRecursively(this.tempDirectory, new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE});
                    this.tempDirectory = null;
                }
            });
            create.register(() -> {
                if (this.cachingFileSystem != null) {
                    this.cachingFileSystem.close();
                    this.cachingFileSystem = null;
                }
            });
            create.register(() -> {
                if (this.rubixInitializer != null) {
                    try {
                        RetryDriver.retry().run("stopRubix", () -> {
                            this.rubixInitializer.stopRubix();
                            return null;
                        });
                        this.rubixInitializer = null;
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] readMode() {
        return new Object[]{new Object[]{RubixConfig.ReadMode.ASYNC}, new Object[]{RubixConfig.ReadMode.READ_THROUGH}};
    }

    @Test
    public void testCoordinatorNotJoining() {
        RubixConfig cacheLocation = new RubixConfig().setCacheLocation("/tmp/not/existing/dir");
        RubixInitializer rubixInitializer = new RubixInitializer(RetryDriver.retry().maxAttempts(1), cacheLocation.setStartServerOnCoordinator(true), new TestingNodeManager(ImmutableList.of(new InternalNode("worker", URI.create("http://127.0.0.2:8080"), NodeVersion.UNKNOWN, false))), new CatalogName("catalog"), new HdfsConfigurationInitializer(this.config, ImmutableSet.of()), new RubixModule.DefaultRubixHdfsInitializer(new HdfsAuthenticationConfig()));
        Objects.requireNonNull(rubixInitializer);
        Assertions.assertThatThrownBy(rubixInitializer::initializeRubix).hasMessage("No coordinator node available");
    }

    @Test
    public void testGetBlockLocations() throws Exception {
        initializeRubix(new RubixConfig(), ImmutableList.of(new InternalNode("master", URI.create("http://" + InetAddress.getLocalHost().getHostAddress() + ":8080"), NodeVersion.UNKNOWN, true), new InternalNode("worker1", URI.create("http://127.0.0.2:8080"), NodeVersion.UNKNOWN, false), new InternalNode("worker2", URI.create("http://127.0.0.3:8080"), NodeVersion.UNKNOWN, false)));
        this.cachingFileSystem = getCachingFileSystem();
        FileStatus fileStatus = new FileStatus(3L, false, 0, 3L, 0L, new org.apache.hadoop.fs.Path("aaa"));
        FileStatus fileStatus2 = new FileStatus(3L, false, 0, 3L, 0L, new org.apache.hadoop.fs.Path("zzzz"));
        BlockLocation[] fileBlockLocations = this.cachingFileSystem.getFileBlockLocations(fileStatus, 0L, 3L);
        BlockLocation[] fileBlockLocations2 = this.cachingFileSystem.getFileBlockLocations(fileStatus2, 0L, 3L);
        Assert.assertEquals(fileBlockLocations.length, 1);
        Assert.assertEquals(fileBlockLocations2.length, 1);
        Assert.assertEquals(fileBlockLocations[0].getHosts()[0], "127.0.0.3");
        Assert.assertEquals(fileBlockLocations2[0].getHosts()[0], "127.0.0.2");
    }

    @Test(dataProvider = "readMode")
    public void testCacheRead(RubixConfig.ReadMode readMode) throws Exception {
        initializeCachingFileSystem(new RubixConfig().setReadMode(readMode));
        byte[] bArr = new byte[(int) SMALL_FILE_SIZE.toBytes()];
        new Random().nextBytes(bArr);
        org.apache.hadoop.fs.Path storagePath = getStoragePath("some_file");
        writeFile(this.nonCachingFileSystem.create(storagePath), bArr);
        long remoteReadsCount = getRemoteReadsCount();
        long cachedReadsCount = getCachedReadsCount();
        long asyncDownloadedMb = getAsyncDownloadedMb(readMode);
        Assert.assertEquals(readFile(this.cachingFileSystem, storagePath), bArr);
        if (readMode == RubixConfig.ReadMode.ASYNC) {
            io.trino.testing.assertions.Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
                Assert.assertEquals(getAsyncDownloadedMb(readMode), asyncDownloadedMb + 1);
            });
        }
        io.trino.testing.assertions.Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
            io.airlift.testing.Assertions.assertGreaterThan(Long.valueOf(getRemoteReadsCount()), Long.valueOf(remoteReadsCount));
            Assert.assertEquals(getCachedReadsCount(), cachedReadsCount);
        });
        io.trino.testing.assertions.Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
            long remoteReadsCount2 = getRemoteReadsCount();
            Assert.assertEquals(readFile(this.cachingFileSystem, storagePath), bArr);
            io.airlift.testing.Assertions.assertGreaterThan(Long.valueOf(getCachedReadsCount()), Long.valueOf(cachedReadsCount));
            Assert.assertEquals(getRemoteReadsCount(), remoteReadsCount2);
        });
    }

    @Test(dataProvider = "readMode")
    public void testCacheWrite(RubixConfig.ReadMode readMode) throws Exception {
        initializeCachingFileSystem(new RubixConfig().setReadMode(readMode));
        org.apache.hadoop.fs.Path storagePath = getStoragePath("some_file_write");
        byte[] bytes = "Hello world".getBytes(StandardCharsets.UTF_8);
        writeFile(this.cachingFileSystem.create(storagePath), bytes);
        Assert.assertEquals(readFile(this.nonCachingFileSystem, storagePath), bytes);
    }

    @Test(dataProvider = "readMode")
    public void testLargeFile(RubixConfig.ReadMode readMode) throws Exception {
        initializeCachingFileSystem(new RubixConfig().setReadMode(readMode));
        byte[] bArr = new byte[(int) LARGE_FILE_SIZE.toBytes()];
        new Random().nextBytes(bArr);
        org.apache.hadoop.fs.Path storagePath = getStoragePath("large_file");
        writeFile(this.nonCachingFileSystem.create(storagePath), bArr);
        long remoteReadsCount = getRemoteReadsCount();
        long cachedReadsCount = getCachedReadsCount();
        long asyncDownloadedMb = getAsyncDownloadedMb(readMode);
        Assert.assertTrue(Arrays.equals(bArr, readFile(this.cachingFileSystem, storagePath)));
        if (readMode == RubixConfig.ReadMode.ASYNC) {
            io.trino.testing.assertions.Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
                Assert.assertEquals(getAsyncDownloadedMb(readMode), asyncDownloadedMb + 100);
            });
        }
        io.trino.testing.assertions.Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
            io.airlift.testing.Assertions.assertGreaterThan(Long.valueOf(getRemoteReadsCount()), Long.valueOf(remoteReadsCount));
        });
        io.trino.testing.assertions.Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
            long remoteReadsCount2 = getRemoteReadsCount();
            Assert.assertTrue(Arrays.equals(bArr, readFile(this.cachingFileSystem, storagePath)));
            io.airlift.testing.Assertions.assertGreaterThan(Long.valueOf(getCachedReadsCount()), Long.valueOf(cachedReadsCount));
            Assert.assertEquals(getRemoteReadsCount(), remoteReadsCount2);
        });
        long cachedReadsCount2 = getCachedReadsCount();
        long remoteReadsCount2 = getRemoteReadsCount();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        try {
            Stream stream = Collections.nCopies(3, () -> {
                Assert.assertTrue(Arrays.equals(bArr, readFile(this.cachingFileSystem, storagePath)));
                return null;
            }).stream();
            Objects.requireNonNull(newFixedThreadPool);
            Iterator it = ((List) stream.map(newFixedThreadPool::submit).collect(ImmutableList.toImmutableList())).iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            io.trino.testing.assertions.Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
                io.airlift.testing.Assertions.assertGreaterThan(Long.valueOf(getCachedReadsCount()), Long.valueOf(cachedReadsCount2));
                Assert.assertEquals(getRemoteReadsCount(), remoteReadsCount2);
            });
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    @Test
    public void testFileSystemBindings() throws Exception {
        initializeRubix(new RubixConfig());
        FileSystem cachingFileSystem = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("s3://bucket_name"));
        try {
            assertRawFileSystemInstanceOf(cachingFileSystem, CachingTrinoS3FileSystem.class);
            if (cachingFileSystem != null) {
                cachingFileSystem.close();
            }
            FileSystem cachingFileSystem2 = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("s3a://bucket_name"));
            try {
                assertRawFileSystemInstanceOf(cachingFileSystem2, CachingTrinoS3FileSystem.class);
                if (cachingFileSystem2 != null) {
                    cachingFileSystem2.close();
                }
                FileSystem cachingFileSystem3 = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("s3n://bucket_name"));
                try {
                    assertRawFileSystemInstanceOf(cachingFileSystem3, CachingTrinoS3FileSystem.class);
                    if (cachingFileSystem3 != null) {
                        cachingFileSystem3.close();
                    }
                    FileSystem cachingFileSystem4 = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("abfs://fileanalysis@foo-bar.dfs.core.windows.net/tutorials"));
                    try {
                        assertRawFileSystemInstanceOf(cachingFileSystem4, CachingPrestoAzureBlobFileSystem.class);
                        if (cachingFileSystem4 != null) {
                            cachingFileSystem4.close();
                        }
                        FileSystem cachingFileSystem5 = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("abfss://fileanalysis@foo-bar.dfs.core.windows.net/tutorials"));
                        try {
                            assertRawFileSystemInstanceOf(cachingFileSystem5, CachingPrestoSecureAzureBlobFileSystem.class);
                            if (cachingFileSystem5 != null) {
                                cachingFileSystem5.close();
                            }
                            FileSystem cachingFileSystem6 = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("adl://fileanalysis@foo-bar.dfs.core.windows.net/tutorials"));
                            try {
                                assertRawFileSystemInstanceOf(cachingFileSystem6, CachingPrestoAdlFileSystem.class);
                                if (cachingFileSystem6 != null) {
                                    cachingFileSystem6.close();
                                }
                                cachingFileSystem = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("gs://bucket_name"));
                                try {
                                    assertRawFileSystemInstanceOf(cachingFileSystem, CachingPrestoGoogleHadoopFileSystem.class);
                                    if (cachingFileSystem != null) {
                                        cachingFileSystem.close();
                                    }
                                    cachingFileSystem4 = getCachingFileSystem(this.context, new org.apache.hadoop.fs.Path("hdfs://localhost:7897"));
                                    try {
                                        assertRawFileSystemInstanceOf(cachingFileSystem4, CachingPrestoDistributedFileSystem.class);
                                        if (cachingFileSystem4 != null) {
                                            cachingFileSystem4.close();
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            } finally {
                                if (cachingFileSystem6 != null) {
                                    try {
                                        cachingFileSystem6.close();
                                    } catch (Throwable th) {
                                        th.addSuppressed(th);
                                    }
                                }
                            }
                        } finally {
                            if (cachingFileSystem5 != null) {
                                try {
                                    cachingFileSystem5.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                        }
                    } finally {
                        if (cachingFileSystem4 != null) {
                            try {
                                cachingFileSystem4.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        }
                    }
                } finally {
                    if (cachingFileSystem3 != null) {
                        try {
                            cachingFileSystem3.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                }
            } finally {
                if (cachingFileSystem2 != null) {
                    try {
                        cachingFileSystem2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }
        } finally {
            if (cachingFileSystem != null) {
                try {
                    cachingFileSystem.close();
                } catch (Throwable th6) {
                    th.addSuppressed(th6);
                }
            }
        }
    }

    private void assertRawFileSystemInstanceOf(FileSystem fileSystem, Class<? extends FileSystem> cls) {
        io.airlift.testing.Assertions.assertInstanceOf(fileSystem, FilterFileSystem.class);
        io.airlift.testing.Assertions.assertInstanceOf(((FilterFileSystem) fileSystem).getRawFileSystem(), cls);
    }

    private byte[] readFile(FileSystem fileSystem, org.apache.hadoop.fs.Path path) {
        try {
            FSDataInputStream open = fileSystem.open(path);
            try {
                byte[] byteArray = ByteStreams.toByteArray(open);
                if (open != null) {
                    open.close();
                }
                return byteArray;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void writeFile(FSDataOutputStream fSDataOutputStream, byte[] bArr) throws IOException {
        try {
            fSDataOutputStream.write(bArr);
        } finally {
            fSDataOutputStream.close();
        }
    }

    private org.apache.hadoop.fs.Path getStoragePath(String str) {
        return new org.apache.hadoop.fs.Path(String.format("file:///%s/storage/%s", this.tempDirectory, str));
    }

    private long getRemoteReadsCount() {
        try {
            return ((Long) BEAN_SERVER.getAttribute(new ObjectName("rubix:name=stats,type=detailed,catalog=catalog"), "Direct_rrc_requests")).longValue() + ((Long) BEAN_SERVER.getAttribute(new ObjectName("rubix:name=stats,type=detailed,catalog=catalog"), "Remote_rrc_requests")).longValue();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private long getCachedReadsCount() {
        try {
            return ((Long) BEAN_SERVER.getAttribute(new ObjectName("rubix:name=stats,type=detailed,catalog=catalog"), "Cached_rrc_requests")).longValue();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private long getAsyncDownloadedMb(RubixConfig.ReadMode readMode) {
        if (readMode == RubixConfig.ReadMode.READ_THROUGH) {
            return 0L;
        }
        try {
            return ((Long) BEAN_SERVER.getAttribute(new ObjectName("metrics:name=rubix.bookkeeper.count.async_downloaded_mb"), "Count")).longValue();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
