package io.trino.exchange;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.slice.Slice;
import io.trino.execution.TaskFailureListener;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.DirectExchangeClientSupplier;
import io.trino.operator.OperatorInfo;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSourceHandle;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/trino/exchange/LazyExchangeDataSource.class */
public class LazyExchangeDataSource implements ExchangeDataSource {
    private final QueryId queryId;
    private final ExchangeId exchangeId;
    private final DirectExchangeClientSupplier directExchangeClientSupplier;
    private final LocalMemoryContext systemMemoryContext;
    private final TaskFailureListener taskFailureListener;
    private final RetryPolicy retryPolicy;
    private final ExchangeManagerRegistry exchangeManagerRegistry;
    private final SettableFuture<Void> initializationFuture = SettableFuture.create();
    private final AtomicReference<ExchangeDataSource> delegate = new AtomicReference<>();
    private final AtomicBoolean closed = new AtomicBoolean();

    public LazyExchangeDataSource(QueryId queryId, ExchangeId exchangeId, DirectExchangeClientSupplier directExchangeClientSupplier, LocalMemoryContext localMemoryContext, TaskFailureListener taskFailureListener, RetryPolicy retryPolicy, ExchangeManagerRegistry exchangeManagerRegistry) {
        this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId is null");
        this.exchangeId = (ExchangeId) Objects.requireNonNull(exchangeId, "exchangeId is null");
        this.directExchangeClientSupplier = (DirectExchangeClientSupplier) Objects.requireNonNull(directExchangeClientSupplier, "directExchangeClientSupplier is null");
        this.systemMemoryContext = (LocalMemoryContext) Objects.requireNonNull(localMemoryContext, "systemMemoryContext is null");
        this.taskFailureListener = (TaskFailureListener) Objects.requireNonNull(taskFailureListener, "taskFailureListener is null");
        this.retryPolicy = (RetryPolicy) Objects.requireNonNull(retryPolicy, "retryPolicy is null");
        this.exchangeManagerRegistry = (ExchangeManagerRegistry) Objects.requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public Slice pollPage() {
        ExchangeDataSource exchangeDataSource = this.delegate.get();
        if (exchangeDataSource == null) {
            return null;
        }
        return exchangeDataSource.pollPage();
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public boolean isFinished() {
        if (this.closed.get()) {
            return true;
        }
        ExchangeDataSource exchangeDataSource = this.delegate.get();
        if (exchangeDataSource == null) {
            return false;
        }
        return exchangeDataSource.isFinished();
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public ListenableFuture<Void> isBlocked() {
        if (this.closed.get()) {
            return Futures.immediateVoidFuture();
        }
        if (!this.initializationFuture.isDone()) {
            return this.initializationFuture;
        }
        ExchangeDataSource exchangeDataSource = this.delegate.get();
        return exchangeDataSource == null ? Futures.immediateVoidFuture() : exchangeDataSource.isBlocked();
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public void addInput(ExchangeInput exchangeInput) {
        boolean z = false;
        synchronized (this) {
            if (this.closed.get()) {
                return;
            }
            ExchangeDataSource exchangeDataSource = this.delegate.get();
            if (exchangeDataSource == null) {
                if (exchangeInput instanceof DirectExchangeInput) {
                    exchangeDataSource = new DirectExchangeDataSource(this.directExchangeClientSupplier.get(this.queryId, this.exchangeId, this.systemMemoryContext, this.taskFailureListener, this.retryPolicy));
                } else {
                    if (!(exchangeInput instanceof SpoolingExchangeInput)) {
                        throw new IllegalArgumentException("Unexpected input: " + exchangeInput);
                    }
                    ExchangeManager exchangeManager = this.exchangeManagerRegistry.getExchangeManager();
                    List<ExchangeSourceHandle> exchangeSourceHandles = ((SpoolingExchangeInput) exchangeInput).getExchangeSourceHandles();
                    exchangeDataSource = new SpoolingExchangeDataSource(exchangeManager.createSource(exchangeSourceHandles), exchangeSourceHandles, this.systemMemoryContext);
                }
                this.delegate.set(exchangeDataSource);
                z = true;
            }
            exchangeDataSource.addInput(exchangeInput);
            if (z) {
                this.initializationFuture.set((Object) null);
            }
        }
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public synchronized void noMoreInputs() {
        if (this.closed.get()) {
            return;
        }
        ExchangeDataSource exchangeDataSource = this.delegate.get();
        if (exchangeDataSource != null) {
            exchangeDataSource.noMoreInputs();
        } else {
            close();
        }
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public OperatorInfo getInfo() {
        ExchangeDataSource exchangeDataSource = this.delegate.get();
        if (exchangeDataSource == null) {
            return null;
        }
        return exchangeDataSource.getInfo();
    }

    @Override // io.trino.exchange.ExchangeDataSource, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (this.closed.compareAndSet(false, true)) {
                ExchangeDataSource exchangeDataSource = this.delegate.get();
                if (exchangeDataSource != null) {
                    exchangeDataSource.close();
                }
                this.initializationFuture.set((Object) null);
            }
        }
    }
}
