package io.trino.plugin.hudi.split;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.trino.plugin.hive.HivePartitionKey;
import io.trino.plugin.hive.util.AsyncQueue;
import io.trino.plugin.hudi.HudiTableHandle;
import io.trino.plugin.hudi.partition.HudiPartitionInfo;
import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader;
import io.trino.plugin.hudi.query.HudiDirectoryLister;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.class */
public class HudiBackgroundSplitLoader {
    private final ConnectorSession session;
    private final HudiDirectoryLister hudiDirectoryLister;
    private final AsyncQueue<ConnectorSplit> asyncQueue;
    private final ExecutorService executor;
    private final Consumer<Throwable> errorListener;
    private final HudiSplitFactory hudiSplitFactory;

    public HudiBackgroundSplitLoader(ConnectorSession connectorSession, HudiTableHandle hudiTableHandle, HudiDirectoryLister hudiDirectoryLister, AsyncQueue<ConnectorSplit> asyncQueue, ExecutorService executorService, HudiSplitWeightProvider hudiSplitWeightProvider, Consumer<Throwable> consumer) {
        this.session = (ConnectorSession) Objects.requireNonNull(connectorSession, "session is null");
        this.hudiDirectoryLister = (HudiDirectoryLister) Objects.requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null");
        this.asyncQueue = (AsyncQueue) Objects.requireNonNull(asyncQueue, "asyncQueue is null");
        this.executor = (ExecutorService) Objects.requireNonNull(executorService, "executor is null");
        this.errorListener = (Consumer) Objects.requireNonNull(consumer, "errorListener is null");
        this.hudiSplitFactory = new HudiSplitFactory(hudiTableHandle, hudiSplitWeightProvider);
    }

    public void start() {
        ListenableFuture submit = Futures.submit(this::loadPartitions, this.executor);
        hookErrorListener(submit);
        hookErrorListener(Futures.transform(submit, collection -> {
            Futures.FutureCombiner whenAllComplete = Futures.whenAllComplete((List) collection.stream().map(hudiPartitionInfo -> {
                return Futures.submit(() -> {
                    loadSplits(hudiPartitionInfo);
                }, this.executor);
            }).peek(this::hookErrorListener).collect(Collectors.toList()));
            AsyncQueue<ConnectorSplit> asyncQueue = this.asyncQueue;
            Objects.requireNonNull(asyncQueue);
            whenAllComplete.run(asyncQueue::finish, MoreExecutors.directExecutor());
            return null;
        }, MoreExecutors.directExecutor()));
    }

    private Collection<HudiPartitionInfo> loadPartitions() {
        HudiPartitionInfoLoader hudiPartitionInfoLoader = new HudiPartitionInfoLoader(this.session, this.hudiDirectoryLister);
        hudiPartitionInfoLoader.run();
        return hudiPartitionInfoLoader.getPartitionQueue();
    }

    private void loadSplits(HudiPartitionInfo hudiPartitionInfo) {
        List<HivePartitionKey> hivePartitionKeys = hudiPartitionInfo.getHivePartitionKeys();
        Stream<R> flatMap = this.hudiDirectoryLister.listStatus(hudiPartitionInfo).stream().flatMap(fileStatus -> {
            return this.hudiSplitFactory.createSplits(hivePartitionKeys, fileStatus);
        });
        AsyncQueue<ConnectorSplit> asyncQueue = this.asyncQueue;
        Objects.requireNonNull(asyncQueue);
        flatMap.map((v1) -> {
            return r1.offer(v1);
        }).forEachOrdered((v0) -> {
            MoreFutures.getFutureValue(v0);
        });
    }

    private <T> void hookErrorListener(ListenableFuture<T> listenableFuture) {
        Futures.addCallback(listenableFuture, new FutureCallback<T>() { // from class: io.trino.plugin.hudi.split.HudiBackgroundSplitLoader.1
            public void onSuccess(T t) {
            }

            public void onFailure(Throwable th) {
                HudiBackgroundSplitLoader.this.errorListener.accept(th);
            }
        }, MoreExecutors.directExecutor());
    }
}
