package io.trino.execution.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.trino.Session;
import io.trino.connector.CatalogHandle;
import io.trino.execution.scheduler.NodeAllocator;
import io.trino.metadata.InternalNode;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

@ThreadSafe
/* loaded from: input_file:io/trino/execution/scheduler/FixedCountNodeAllocatorService.class */
public class FixedCountNodeAllocatorService implements NodeAllocatorService {
    private static final Logger log = Logger.get(FixedCountNodeAllocatorService.class);
    private static final int MAXIMUM_ALLOCATIONS_PER_NODE = 1;
    private final NodeScheduler nodeScheduler;
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, Threads.daemonThreadsNamed("fixed-count-node-allocator"));
    private final Set<FixedCountNodeAllocator> allocators = Sets.newConcurrentHashSet();
    private final AtomicBoolean started = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/FixedCountNodeAllocatorService$FixedCountNodeAllocator.class */
    public class FixedCountNodeAllocator implements NodeAllocator {
        private final Session session;
        private final int maximumAllocationsPerNode;

        @GuardedBy("this")
        private final Map<Optional<CatalogHandle>, NodeSelector> nodeSelectorCache = new HashMap();

        @GuardedBy("this")
        private final Map<InternalNode, Integer> allocationCountMap = new HashMap();

        @GuardedBy("this")
        private final List<PendingAcquire> pendingAcquires = new LinkedList();

        /* loaded from: input_file:io/trino/execution/scheduler/FixedCountNodeAllocatorService$FixedCountNodeAllocator$FixedCountNodeLease.class */
        private class FixedCountNodeLease implements NodeAllocator.NodeLease {
            private final ListenableFuture<InternalNode> node;
            private final AtomicBoolean released = new AtomicBoolean();

            private FixedCountNodeLease(ListenableFuture<InternalNode> listenableFuture) {
                this.node = (ListenableFuture) Objects.requireNonNull(listenableFuture, "node is null");
            }

            @Override // io.trino.execution.scheduler.NodeAllocator.NodeLease
            public ListenableFuture<InternalNode> getNode() {
                return this.node;
            }

            @Override // io.trino.execution.scheduler.NodeAllocator.NodeLease
            public void release() {
                if (!this.released.compareAndSet(false, true)) {
                    throw new IllegalStateException("Node " + this.node + " already released");
                }
                this.node.cancel(true);
                if (!this.node.isDone() || this.node.isCancelled()) {
                    return;
                }
                FixedCountNodeAllocator.this.releaseNode((InternalNode) MoreFutures.getFutureValue(this.node));
            }
        }

        public FixedCountNodeAllocator(Session session, int i) {
            this.session = (Session) Objects.requireNonNull(session, "session is null");
            this.maximumAllocationsPerNode = i;
        }

        @Override // io.trino.execution.scheduler.NodeAllocator
        public synchronized NodeAllocator.NodeLease acquire(NodeRequirements nodeRequirements) {
            try {
                Optional<InternalNode> tryAcquireNode = tryAcquireNode(nodeRequirements);
                if (tryAcquireNode.isPresent()) {
                    return new FixedCountNodeLease(Futures.immediateFuture(tryAcquireNode.get()));
                }
                SettableFuture create = SettableFuture.create();
                this.pendingAcquires.add(new PendingAcquire(nodeRequirements, create));
                return new FixedCountNodeLease(create);
            } catch (RuntimeException e) {
                return new FixedCountNodeLease(Futures.immediateFailedFuture(e));
            }
        }

        public void updateNodes() {
            processPendingAcquires();
        }

        private synchronized Optional<InternalNode> tryAcquireNode(NodeRequirements nodeRequirements) {
            List<InternalNode> allNodes = this.nodeSelectorCache.computeIfAbsent(nodeRequirements.getCatalogHandle(), optional -> {
                return FixedCountNodeAllocatorService.this.nodeScheduler.createNodeSelector(this.session, optional);
            }).allNodes();
            if (allNodes.isEmpty()) {
                throw new TrinoException(StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query");
            }
            List list = (List) allNodes.stream().filter(internalNode -> {
                return nodeRequirements.getAddresses().isEmpty() || nodeRequirements.getAddresses().contains(internalNode.getHostAndPort());
            }).collect(ImmutableList.toImmutableList());
            if (list.isEmpty()) {
                throw new TrinoException(StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query");
            }
            Optional<InternalNode> min = list.stream().filter(internalNode2 -> {
                return this.allocationCountMap.getOrDefault(internalNode2, 0).intValue() < this.maximumAllocationsPerNode;
            }).min(Comparator.comparing(internalNode3 -> {
                return this.allocationCountMap.getOrDefault(internalNode3, 0);
            }));
            if (min.isEmpty()) {
                return Optional.empty();
            }
            this.allocationCountMap.compute(min.get(), (internalNode4, num) -> {
                return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
            });
            return min;
        }

        private void releaseNode(InternalNode internalNode) {
            synchronized (this) {
                int intValue = this.allocationCountMap.compute(internalNode, (internalNode2, num) -> {
                    return Integer.valueOf(num == null ? 0 : num.intValue() - 1);
                }).intValue();
                Preconditions.checkState(intValue >= 0, "allocation count for node %s is expected to be greater than or equal to zero: %s", internalNode, intValue);
            }
            processPendingAcquires();
        }

        private void processPendingAcquires() {
            Verify.verify(!Thread.holdsLock(this));
            IdentityHashMap identityHashMap = new IdentityHashMap();
            IdentityHashMap identityHashMap2 = new IdentityHashMap();
            synchronized (this) {
                Iterator<PendingAcquire> it = this.pendingAcquires.iterator();
                while (it.hasNext()) {
                    PendingAcquire next = it.next();
                    if (next.getFuture().isCancelled()) {
                        it.remove();
                    } else {
                        try {
                            Optional<InternalNode> tryAcquireNode = tryAcquireNode(next.getNodeRequirements());
                            if (tryAcquireNode.isPresent()) {
                                it.remove();
                                identityHashMap.put(next, tryAcquireNode.get());
                            }
                        } catch (RuntimeException e) {
                            it.remove();
                            identityHashMap2.put(next, e);
                        }
                    }
                }
            }
            identityHashMap.forEach((pendingAcquire, internalNode) -> {
                SettableFuture<InternalNode> future = pendingAcquire.getFuture();
                future.set(internalNode);
                if (future.isCancelled()) {
                    releaseNode(internalNode);
                }
            });
            identityHashMap2.forEach((pendingAcquire2, runtimeException) -> {
                pendingAcquire2.getFuture().setException(runtimeException);
            });
        }

        @Override // io.trino.execution.scheduler.NodeAllocator, java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            FixedCountNodeAllocatorService.this.allocators.remove(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/FixedCountNodeAllocatorService$PendingAcquire.class */
    public static class PendingAcquire {
        private final NodeRequirements nodeRequirements;
        private final SettableFuture<InternalNode> future;

        private PendingAcquire(NodeRequirements nodeRequirements, SettableFuture<InternalNode> settableFuture) {
            this.nodeRequirements = (NodeRequirements) Objects.requireNonNull(nodeRequirements, "nodeRequirements is null");
            this.future = (SettableFuture) Objects.requireNonNull(settableFuture, "future is null");
        }

        public NodeRequirements getNodeRequirements() {
            return this.nodeRequirements;
        }

        public SettableFuture<InternalNode> getFuture() {
            return this.future;
        }
    }

    @Inject
    public FixedCountNodeAllocatorService(NodeScheduler nodeScheduler) {
        this.nodeScheduler = (NodeScheduler) Objects.requireNonNull(nodeScheduler, "nodeScheduler is null");
    }

    @PostConstruct
    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.executor.scheduleWithFixedDelay(() -> {
                try {
                    updateNodes();
                } catch (Throwable th) {
                    log.warn(th, "Error updating nodes");
                }
            }, 5L, 5L, TimeUnit.SECONDS);
        }
    }

    @PreDestroy
    public void stop() {
        this.executor.shutdownNow();
    }

    @VisibleForTesting
    void updateNodes() {
        this.allocators.forEach((v0) -> {
            v0.updateNodes();
        });
    }

    @Override // io.trino.execution.scheduler.NodeAllocatorService
    public NodeAllocator getNodeAllocator(Session session) {
        Objects.requireNonNull(session, "session is null");
        return getNodeAllocator(session, 1);
    }

    @VisibleForTesting
    NodeAllocator getNodeAllocator(Session session, int i) {
        FixedCountNodeAllocator fixedCountNodeAllocator = new FixedCountNodeAllocator(session, i);
        this.allocators.add(fixedCountNodeAllocator);
        return fixedCountNodeAllocator;
    }
}
