package io.trino.spiller;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.metadata.Metadata;
import io.trino.operator.SpillContext;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.type.Type;
import io.trino.sql.analyzer.FeaturesConfig;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/* loaded from: input_file:io/trino/spiller/FileSingleStreamSpillerFactory.class */
public class FileSingleStreamSpillerFactory implements SingleStreamSpillerFactory {

    @VisibleForTesting
    static final String SPILL_FILE_PREFIX = "spill";

    @VisibleForTesting
    static final String SPILL_FILE_SUFFIX = ".bin";
    private static final String SPILL_FILE_GLOB = "spill*.bin";
    private final ListeningExecutorService executor;
    private final PagesSerdeFactory serdeFactory;
    private final List<Path> spillPaths;
    private final SpillerStats spillerStats;
    private final double maxUsedSpaceThreshold;
    private final boolean spillEncryptionEnabled;
    private int roundRobinIndex;
    private final LoadingCache<Path, Boolean> spillPathHealthCache;
    private static final Logger log = Logger.get(FileSingleStreamSpillerFactory.class);
    private static final Duration SPILL_PATH_HEALTH_EXPIRY_INTERVAL = Duration.ofMinutes(5);

    @Inject
    public FileSingleStreamSpillerFactory(Metadata metadata, SpillerStats spillerStats, FeaturesConfig featuresConfig, NodeSpillConfig nodeSpillConfig) {
        this(MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(((FeaturesConfig) Objects.requireNonNull(featuresConfig, "featuresConfig is null")).getSpillerThreads(), Threads.daemonThreadsNamed("binary-spiller-%s"))), ((Metadata) Objects.requireNonNull(metadata, "metadata is null")).getBlockEncodingSerde(), spillerStats, ((FeaturesConfig) Objects.requireNonNull(featuresConfig, "featuresConfig is null")).getSpillerSpillPaths(), ((FeaturesConfig) Objects.requireNonNull(featuresConfig, "featuresConfig is null")).getSpillMaxUsedSpaceThreshold(), ((NodeSpillConfig) Objects.requireNonNull(nodeSpillConfig, "nodeSpillConfig is null")).isSpillCompressionEnabled(), ((NodeSpillConfig) Objects.requireNonNull(nodeSpillConfig, "nodeSpillConfig is null")).isSpillEncryptionEnabled());
    }

    @VisibleForTesting
    public FileSingleStreamSpillerFactory(ListeningExecutorService listeningExecutorService, BlockEncodingSerde blockEncodingSerde, SpillerStats spillerStats, List<Path> list, double d, boolean z, boolean z2) {
        this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, z);
        this.executor = (ListeningExecutorService) Objects.requireNonNull(listeningExecutorService, "executor is null");
        this.spillerStats = (SpillerStats) Objects.requireNonNull(spillerStats, "spillerStats cannot be null");
        Objects.requireNonNull(list, "spillPaths is null");
        this.spillPaths = ImmutableList.copyOf(list);
        list.forEach(path -> {
            try {
                Files.createDirectories(path, new FileAttribute[0]);
                if (!isAccessible(path)) {
                    throw new IllegalArgumentException(String.format("spill path %s is not accessible, it must be +rwx; adjust %s config property or filesystem permissions", path, FeaturesConfig.SPILLER_SPILL_PATH));
                }
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("could not create spill path %s; adjust %s config property or filesystem permissions", path, FeaturesConfig.SPILLER_SPILL_PATH), e);
            }
        });
        this.maxUsedSpaceThreshold = d;
        this.spillEncryptionEnabled = z2;
        this.roundRobinIndex = 0;
        this.spillPathHealthCache = CacheBuilder.newBuilder().expireAfterWrite(SPILL_PATH_HEALTH_EXPIRY_INTERVAL).build(CacheLoader.from(path2 -> {
            return Boolean.valueOf(isAccessible(path2) && isSeeminglyHealthy(path2));
        }));
    }

    @PostConstruct
    public void cleanupOldSpillFiles() {
        this.spillPaths.forEach(FileSingleStreamSpillerFactory::cleanupOldSpillFiles);
    }

    @PreDestroy
    public void destroy() {
        this.executor.shutdownNow();
    }

    private static void cleanupOldSpillFiles(Path path) {
        try {
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(path, SPILL_FILE_GLOB);
            try {
                newDirectoryStream.forEach(path2 -> {
                    try {
                        log.info("Deleting old spill file: " + path2);
                        Files.delete(path2);
                    } catch (Exception e) {
                        log.warn("Could not cleanup old spill file: " + path2);
                    }
                });
                if (newDirectoryStream != null) {
                    newDirectoryStream.close();
                }
            } finally {
            }
        } catch (IOException e) {
            log.warn(e, "Error cleaning spill files");
        }
    }

    @Override // io.trino.spiller.SingleStreamSpillerFactory
    public SingleStreamSpiller create(List<Type> list, SpillContext spillContext, LocalMemoryContext localMemoryContext) {
        Optional<SpillCipher> empty = Optional.empty();
        if (this.spillEncryptionEnabled) {
            empty = Optional.of(new AesSpillCipher());
        }
        LoadingCache<Path, Boolean> loadingCache = this.spillPathHealthCache;
        Objects.requireNonNull(loadingCache);
        return new FileSingleStreamSpiller(this.serdeFactory.createPagesSerdeForSpill(empty), this.executor, getNextSpillPath(), this.spillerStats, spillContext, localMemoryContext, empty, loadingCache::invalidateAll);
    }

    private synchronized Path getNextSpillPath() {
        int size = this.spillPaths.size();
        for (int i = 0; i < size; i++) {
            Path path = this.spillPaths.get((this.roundRobinIndex + i) % size);
            if (hasEnoughDiskSpace(path) && ((Boolean) this.spillPathHealthCache.getUnchecked(path)).booleanValue()) {
                this.roundRobinIndex = ((this.roundRobinIndex + i) + 1) % size;
                return path;
            }
        }
        if (this.spillPaths.isEmpty()) {
            throw new TrinoException(StandardErrorCode.OUT_OF_SPILL_SPACE, "No spill paths configured");
        }
        throw new TrinoException(StandardErrorCode.OUT_OF_SPILL_SPACE, "No free or healthy space available for spill");
    }

    private boolean hasEnoughDiskSpace(Path path) {
        try {
            FileStore fileStore = Files.getFileStore(path);
            return ((double) fileStore.getUsableSpace()) > ((double) fileStore.getTotalSpace()) * (1.0d - this.maxUsedSpaceThreshold);
        } catch (IOException e) {
            throw new TrinoException(StandardErrorCode.OUT_OF_SPILL_SPACE, "Cannot determine free space for spill", e);
        }
    }

    private boolean isAccessible(Path path) {
        return Files.isReadable(path) && Files.isWritable(path) && Files.isExecutable(path);
    }

    private boolean isSeeminglyHealthy(Path path) {
        try {
            return Files.deleteIfExists(Files.createTempFile(path, SPILL_FILE_PREFIX, "healthcheck", new FileAttribute[0]));
        } catch (IOException e) {
            log.warn(e, "Health check failed for spill %s", new Object[]{path});
            return false;
        }
    }

    @VisibleForTesting
    long getSpillPathCacheSize() {
        return this.spillPathHealthCache.size();
    }
}
