package io.trino.hdfs.rubix;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.MoreCollectors;
import com.google.common.io.Closer;
import com.google.inject.Inject;
import com.qubole.rubix.bookkeeper.BookKeeper;
import com.qubole.rubix.bookkeeper.BookKeeperServer;
import com.qubole.rubix.bookkeeper.LocalDataTransferServer;
import com.qubole.rubix.common.metrics.MetricsReporterType;
import com.qubole.rubix.core.CachingFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAdlFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAzureBlobFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoGoogleHadoopFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoNativeAzureFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoSecureAzureBlobFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoSecureNativeAzureFileSystem;
import com.qubole.rubix.spi.CacheConfig;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeExecutor;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import io.airlift.log.Logger;
import io.trino.hdfs.ConfigurationUtils;
import io.trino.hdfs.DynamicConfigurationProvider;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.plugin.base.CatalogName;
import io.trino.spi.HostAddress;
import io.trino.spi.Node;
import io.trino.spi.NodeManager;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:io/trino/hdfs/rubix/RubixInitializer.class */
public class RubixInitializer {
    private static final String FILESYSTEM_OWNED_BY_RUBIX_CONFIG_PROPETY = "presto.fs.owned.by.rubix";
    private final FailsafeExecutor<?> coordinatorFailsafeExecutor;
    private final boolean startServerOnCoordinator;
    private final boolean parallelWarmupEnabled;
    private final Optional<String> cacheLocation;
    private final long cacheTtlMillis;
    private final int diskUsagePercentage;
    private final int bookKeeperServerPort;
    private final int dataTransferServerPort;
    private final NodeManager nodeManager;
    private final CatalogName catalogName;
    private final HdfsConfigurationInitializer hdfsConfigurationInitializer;
    private final RubixHdfsInitializer rubixHdfsInitializer;
    private volatile boolean cacheReady;

    @Nullable
    private HostAddress masterAddress;

    @Nullable
    private BookKeeperServer bookKeeperServer;
    private static final String RUBIX_S3_FS_CLASS_NAME = CachingTrinoS3FileSystem.class.getName();
    private static final String RUBIX_NATIVE_AZURE_FS_CLASS_NAME = CachingPrestoNativeAzureFileSystem.class.getName();
    private static final String RUBIX_SECURE_NATIVE_AZURE_FS_CLASS_NAME = CachingPrestoSecureNativeAzureFileSystem.class.getName();
    private static final String RUBIX_AZURE_BLOB_FS_CLASS_NAME = CachingPrestoAzureBlobFileSystem.class.getName();
    private static final String RUBIX_SECURE_AZURE_BLOB_FS_CLASS_NAME = CachingPrestoSecureAzureBlobFileSystem.class.getName();
    private static final String RUBIX_SECURE_ADL_CLASS_NAME = CachingPrestoAdlFileSystem.class.getName();
    private static final String RUBIX_GS_FS_CLASS_NAME = CachingPrestoGoogleHadoopFileSystem.class.getName();
    private static final FailsafeExecutor<?> DEFAULT_COORDINATOR_FAILSAFE_EXECUTOR = Failsafe.with(((RetryPolicyBuilder) RetryPolicy.builder().handle(TrinoException.class)).withMaxAttempts(-1).withMaxDuration(Duration.ofMinutes(10)).withDelay(Duration.ofSeconds(1)).build(), new RetryPolicy[0]);
    private static final Logger log = Logger.get(RubixInitializer.class);

    /* loaded from: input_file:io/trino/hdfs/rubix/RubixInitializer$Owner.class */
    public enum Owner {
        PRESTO,
        RUBIX
    }

    @Inject
    public RubixInitializer(RubixConfig rubixConfig, NodeManager nodeManager, CatalogName catalogName, HdfsConfigurationInitializer hdfsConfigurationInitializer, RubixHdfsInitializer rubixHdfsInitializer) {
        this(DEFAULT_COORDINATOR_FAILSAFE_EXECUTOR, rubixConfig, nodeManager, catalogName, hdfsConfigurationInitializer, rubixHdfsInitializer);
    }

    @VisibleForTesting
    RubixInitializer(FailsafeExecutor<?> failsafeExecutor, RubixConfig rubixConfig, NodeManager nodeManager, CatalogName catalogName, HdfsConfigurationInitializer hdfsConfigurationInitializer, RubixHdfsInitializer rubixHdfsInitializer) {
        this.coordinatorFailsafeExecutor = failsafeExecutor;
        this.startServerOnCoordinator = rubixConfig.isStartServerOnCoordinator();
        this.parallelWarmupEnabled = rubixConfig.getReadMode().isParallelWarmupEnabled();
        this.cacheLocation = rubixConfig.getCacheLocation();
        this.cacheTtlMillis = rubixConfig.getCacheTtl().toMillis();
        this.diskUsagePercentage = rubixConfig.getDiskUsagePercentage();
        this.bookKeeperServerPort = rubixConfig.getBookKeeperServerPort();
        this.dataTransferServerPort = rubixConfig.getDataTransferServerPort();
        this.nodeManager = nodeManager;
        this.catalogName = catalogName;
        this.hdfsConfigurationInitializer = hdfsConfigurationInitializer;
        this.rubixHdfsInitializer = rubixHdfsInitializer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initializeRubix() {
        if (this.nodeManager.getCurrentNode().isCoordinator() && !this.startServerOnCoordinator) {
            setupRubixMetrics();
            this.cacheReady = true;
        } else {
            if (this.cacheLocation.isEmpty()) {
                throw new IllegalArgumentException("caching directories were not provided");
            }
            waitForCoordinator();
            startRubix();
        }
    }

    @PreDestroy
    public void stopRubix() throws IOException {
        Closer create = Closer.create();
        try {
            create.register(() -> {
                if (this.bookKeeperServer != null) {
                    this.bookKeeperServer.stopServer();
                    this.bookKeeperServer = null;
                }
            });
            create.register(LocalDataTransferServer::stopServer);
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void enableRubix(Configuration configuration) {
        if (!this.cacheReady) {
            disableRubix(configuration);
        } else {
            updateRubixConfiguration(configuration, Owner.PRESTO);
            DynamicConfigurationProvider.setCacheKey(configuration, "rubix_enabled");
        }
    }

    public void disableRubix(Configuration configuration) {
        CacheConfig.setCacheDataEnabled(configuration, false);
        DynamicConfigurationProvider.setCacheKey(configuration, "rubix_disabled");
    }

    public static Owner getConfigurationOwner(Configuration configuration) {
        return configuration.get(FILESYSTEM_OWNED_BY_RUBIX_CONFIG_PROPETY, "").equals("true") ? Owner.RUBIX : Owner.PRESTO;
    }

    @VisibleForTesting
    boolean isServerUp() {
        return LocalDataTransferServer.isServerUp() && this.bookKeeperServer != null && this.bookKeeperServer.isServerUp();
    }

    private void waitForCoordinator() {
        this.coordinatorFailsafeExecutor.run(() -> {
            if (this.nodeManager.getAllNodes().stream().noneMatch((v0) -> {
                return v0.isCoordinator();
            })) {
                throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "No coordinator node available");
            }
        });
    }

    private void startRubix() {
        Configuration rubixServerConfiguration = getRubixServerConfiguration();
        MetricRegistry metricRegistry = new MetricRegistry();
        this.bookKeeperServer = new BookKeeperServer();
        BookKeeper startServer = this.bookKeeperServer.startServer(rubixServerConfiguration, metricRegistry);
        LocalDataTransferServer.startServer(rubixServerConfiguration, metricRegistry, startServer);
        CachingFileSystem.setLocalBookKeeper(rubixServerConfiguration, startServer, "catalog=" + this.catalogName);
        TrinoClusterManager.setNodeManager(this.nodeManager);
        log.info("Rubix initialized successfully");
        this.cacheReady = true;
    }

    private void setupRubixMetrics() {
        Configuration rubixServerConfiguration = getRubixServerConfiguration();
        new BookKeeperServer().setupServer(rubixServerConfiguration, new MetricRegistry());
        CachingFileSystem.setLocalBookKeeper(rubixServerConfiguration, new DummyBookKeeper(), "catalog=" + this.catalogName);
        TrinoClusterManager.setNodeManager(this.nodeManager);
    }

    private Configuration getRubixServerConfiguration() {
        this.masterAddress = ((Node) this.nodeManager.getAllNodes().stream().filter((v0) -> {
            return v0.isCoordinator();
        }).collect(MoreCollectors.onlyElement())).getHostAndPort();
        Configuration initialConfiguration = ConfigurationUtils.getInitialConfiguration();
        this.hdfsConfigurationInitializer.initializeConfiguration(initialConfiguration);
        updateRubixConfiguration(initialConfiguration, Owner.RUBIX);
        DynamicConfigurationProvider.setCacheKey(initialConfiguration, "rubix_internal");
        return initialConfiguration;
    }

    private void updateRubixConfiguration(Configuration configuration, Owner owner) {
        Preconditions.checkState(this.masterAddress != null, "masterAddress is not set");
        CacheConfig.setCacheDataEnabled(configuration, true);
        CacheConfig.setOnMaster(configuration, this.nodeManager.getCurrentNode().isCoordinator());
        CacheConfig.setCoordinatorHostName(configuration, this.masterAddress.getHostText());
        CacheConfig.setIsParallelWarmupEnabled(configuration, this.parallelWarmupEnabled);
        CacheConfig.setCacheDataExpirationAfterWrite(configuration, this.cacheTtlMillis);
        CacheConfig.setCacheDataFullnessPercentage(configuration, this.diskUsagePercentage);
        CacheConfig.setBookKeeperServerPort(configuration, this.bookKeeperServerPort);
        CacheConfig.setDataTransferServerPort(configuration, this.dataTransferServerPort);
        CacheConfig.setMetricsReporters(configuration, MetricsReporterType.JMX.name());
        CacheConfig.setEmbeddedMode(configuration, true);
        CacheConfig.enableHeartbeat(configuration, false);
        CacheConfig.setClusterNodeRefreshTime(configuration, 10);
        if (!this.nodeManager.getCurrentNode().isCoordinator() || this.startServerOnCoordinator) {
            CacheConfig.setCacheDataDirPrefix(configuration, this.cacheLocation.orElseThrow());
        } else {
            CacheConfig.setCacheDataOnMasterEnabled(configuration, false);
        }
        configuration.set("fs.s3.impl", RUBIX_S3_FS_CLASS_NAME);
        configuration.set("fs.s3a.impl", RUBIX_S3_FS_CLASS_NAME);
        configuration.set("fs.s3n.impl", RUBIX_S3_FS_CLASS_NAME);
        configuration.set("fs.wasb.impl", RUBIX_NATIVE_AZURE_FS_CLASS_NAME);
        configuration.set("fs.wasbs.impl", RUBIX_SECURE_NATIVE_AZURE_FS_CLASS_NAME);
        configuration.set("fs.abfs.impl", RUBIX_AZURE_BLOB_FS_CLASS_NAME);
        configuration.set("fs.abfss.impl", RUBIX_SECURE_AZURE_BLOB_FS_CLASS_NAME);
        configuration.set("fs.adl.impl", RUBIX_SECURE_ADL_CLASS_NAME);
        configuration.set("fs.gs.impl", RUBIX_GS_FS_CLASS_NAME);
        if (owner == Owner.RUBIX) {
            configuration.set(FILESYSTEM_OWNED_BY_RUBIX_CONFIG_PROPETY, "true");
        }
        CacheConfig.setPrestoClusterManager(configuration, TrinoClusterManager.class.getName());
        this.rubixHdfsInitializer.initializeConfiguration(configuration);
    }
}
