package io.trino.plugin.deltalake;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.plugin.hive.util.AsyncQueue;
import io.trino.plugin.hive.util.ThrottledAsyncQueue;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/* loaded from: input_file:io/trino/plugin/deltalake/DeltaLakeSplitSource.class */
public class DeltaLakeSplitSource implements ConnectorSplitSource {
    private static final ConnectorSplitSource.ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitSource.ConnectorSplitBatch(ImmutableList.of(), false);
    private static final Logger LOG = Logger.get(DeltaLakeSplitSource.class);
    private final SchemaTableName tableName;
    private final AsyncQueue<ConnectorSplit> queue;
    private final boolean recordScannedFiles;
    private final DynamicFilter dynamicFilter;
    private final long dynamicFilteringWaitTimeoutMillis;
    private volatile TrinoException trinoException;
    private final ImmutableSet.Builder<String> scannedFilePaths = ImmutableSet.builder();
    private final Stopwatch dynamicFilterWaitStopwatch = Stopwatch.createStarted();

    public DeltaLakeSplitSource(SchemaTableName schemaTableName, Stream<DeltaLakeSplit> stream, ExecutorService executorService, int i, int i2, DynamicFilter dynamicFilter, Duration duration, boolean z) {
        this.tableName = (SchemaTableName) Objects.requireNonNull(schemaTableName, "tableName is null");
        this.queue = new ThrottledAsyncQueue(i, i2, executorService);
        this.recordScannedFiles = z;
        this.dynamicFilter = (DynamicFilter) Objects.requireNonNull(dynamicFilter, "dynamicFilter is null");
        this.dynamicFilteringWaitTimeoutMillis = duration.toMillis();
        queueSplits(stream, this.queue, executorService).exceptionally(th -> {
            this.trinoException = new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to generate splits for " + this.tableName, th);
            try {
                this.queue.finish();
                return null;
            } catch (Exception e) {
                LOG.error(e, "Could not communicate split generation error for %s to query; this may cause it to be blocked", new Object[]{schemaTableName});
                return null;
            }
        });
    }

    public CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(int i) {
        long elapsed = this.dynamicFilteringWaitTimeoutMillis - this.dynamicFilterWaitStopwatch.elapsed(TimeUnit.MILLISECONDS);
        if (this.dynamicFilter.isAwaitable() && elapsed > 0) {
            return this.dynamicFilter.isBlocked().thenApply(obj -> {
                return EMPTY_BATCH;
            }).completeOnTimeout(EMPTY_BATCH, elapsed, TimeUnit.MILLISECONDS);
        }
        boolean isFinished = isFinished();
        return this.trinoException != null ? MoreFutures.toCompletableFuture(Futures.immediateFailedFuture(this.trinoException)) : MoreFutures.toCompletableFuture(Futures.transform(this.queue.getBatchAsync(i), list -> {
            TupleDomain currentPredicate = this.dynamicFilter.getCurrentPredicate();
            Class<DeltaLakeColumnHandle> cls = DeltaLakeColumnHandle.class;
            Objects.requireNonNull(DeltaLakeColumnHandle.class);
            TupleDomain transformKeys = currentPredicate.transformKeys((v1) -> {
                return r1.cast(v1);
            });
            if (transformKeys.isNone()) {
                return new ConnectorSplitSource.ConnectorSplitBatch(ImmutableList.of(), isFinished);
            }
            Map map = (Map) ((Map) transformKeys.getDomains().orElseThrow()).entrySet().stream().filter(entry -> {
                return ((DeltaLakeColumnHandle) entry.getKey()).getColumnType() == DeltaLakeColumnType.PARTITION_KEY;
            }).collect(ImmutableMap.toImmutableMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            Stream stream = list.stream();
            Class<DeltaLakeSplit> cls2 = DeltaLakeSplit.class;
            Objects.requireNonNull(DeltaLakeSplit.class);
            List list = (List) stream.map((v1) -> {
                return r1.cast(v1);
            }).filter(deltaLakeSplit -> {
                return deltaLakeSplit.getStatisticsPredicate().overlaps(transformKeys) && DeltaLakeSplitManager.partitionMatchesPredicate(deltaLakeSplit.getPartitionKeys(), map);
            }).collect(ImmutableList.toImmutableList());
            if (this.recordScannedFiles) {
                list.forEach(connectorSplit -> {
                    this.scannedFilePaths.add(((DeltaLakeSplit) connectorSplit).getPath());
                });
            }
            return new ConnectorSplitSource.ConnectorSplitBatch(list, isFinished);
        }, MoreExecutors.directExecutor()));
    }

    public Optional<List<Object>> getTableExecuteSplitsInfo() {
        Preconditions.checkState(isFinished(), "Split source must be finished before TableExecuteSplitsInfo is read");
        return !this.recordScannedFiles ? Optional.empty() : Optional.of(ImmutableList.copyOf(this.scannedFilePaths.build()));
    }

    public void close() {
        this.queue.finish();
    }

    public boolean isFinished() {
        return this.queue.isFinished() && this.trinoException == null;
    }

    private static CompletableFuture<Void> queueSplits(Stream<DeltaLakeSplit> stream, AsyncQueue<ConnectorSplit> asyncQueue, ExecutorService executorService) {
        Objects.requireNonNull(stream, "splits is null");
        return CompletableFuture.runAsync(() -> {
            Objects.requireNonNull(asyncQueue);
            stream.map((v1) -> {
                return r1.offer(v1);
            }).forEachOrdered((v0) -> {
                MoreFutures.getFutureValue(v0);
            });
            asyncQueue.finish();
        }, executorService);
    }
}
