package io.trino.execution.scheduler;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.ForQueryExecution;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.scheduler.EventDrivenTaskSource;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.exchange.Exchange;
import io.trino.sql.planner.MergePartitioningHandle;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.SplitSourceFactory;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableWriterNode;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;
import javax.inject.Inject;

/* loaded from: input_file:io/trino/execution/scheduler/EventDrivenTaskSourceFactory.class */
public class EventDrivenTaskSourceFactory {
    private final SplitSourceFactory splitSourceFactory;
    private final Executor executor;
    private final InternalNodeManager nodeManager;
    private final int splitBatchSize;

    @Inject
    public EventDrivenTaskSourceFactory(SplitSourceFactory splitSourceFactory, @ForQueryExecution ExecutorService executorService, InternalNodeManager internalNodeManager, QueryManagerConfig queryManagerConfig) {
        this(splitSourceFactory, executorService, internalNodeManager, ((QueryManagerConfig) Objects.requireNonNull(queryManagerConfig, "queryManagerConfig is null")).getScheduleSplitBatchSize());
    }

    public EventDrivenTaskSourceFactory(SplitSourceFactory splitSourceFactory, Executor executor, InternalNodeManager internalNodeManager, int i) {
        this.splitSourceFactory = (SplitSourceFactory) Objects.requireNonNull(splitSourceFactory, "splitSourceFactory is null");
        this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
        this.nodeManager = (InternalNodeManager) Objects.requireNonNull(internalNodeManager, "nodeManager is null");
        this.splitBatchSize = i;
    }

    public EventDrivenTaskSource create(EventDrivenTaskSource.Callback callback, Session session, PlanFragment planFragment, Map<PlanFragmentId, Exchange> map, FaultTolerantPartitioningScheme faultTolerantPartitioningScheme, LongConsumer longConsumer, Map<PlanNodeId, OutputDataSizeEstimate> map2) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (RemoteSourceNode remoteSourceNode : planFragment.getRemoteSourceNodes()) {
            Iterator<PlanFragmentId> it = remoteSourceNode.getSourceFragmentIds().iterator();
            while (it.hasNext()) {
                builder.put(it.next(), remoteSourceNode.getId());
            }
        }
        long bytes = SystemSessionProperties.getFaultTolerantExecutionTargetTaskInputSize(session).toBytes();
        long faultTolerantExecutionTargetTaskSplitCount = bytes / SystemSessionProperties.getFaultTolerantExecutionTargetTaskSplitCount(session);
        return new EventDrivenTaskSource(map, builder.buildOrThrow(), () -> {
            return this.splitSourceFactory.createSplitSources(session, planFragment);
        }, createSplitAssigner(session, planFragment, map2, faultTolerantPartitioningScheme, bytes, faultTolerantExecutionTargetTaskSplitCount, SystemSessionProperties.getFaultTolerantExecutionMaxTaskSplitCount(session)), callback, this.executor, this.splitBatchSize, faultTolerantExecutionTargetTaskSplitCount, faultTolerantPartitioningScheme, longConsumer);
    }

    private SplitAssigner createSplitAssigner(Session session, PlanFragment planFragment, Map<PlanNodeId, OutputDataSizeEstimate> map, FaultTolerantPartitioningScheme faultTolerantPartitioningScheme, long j, long j2, int i) {
        PartitioningHandle partitioning = planFragment.getPartitioning();
        ImmutableSet build = ImmutableSet.builder().addAll((Set) planFragment.getRemoteSourceNodes().stream().filter(remoteSourceNode -> {
            return remoteSourceNode.getExchangeType() != ExchangeNode.Type.REPLICATE;
        }).map((v0) -> {
            return v0.getId();
        }).collect(ImmutableSet.toImmutableSet())).addAll(planFragment.getPartitionedSources()).build();
        Set set = (Set) planFragment.getRemoteSourceNodes().stream().filter(remoteSourceNode2 -> {
            return remoteSourceNode2.getExchangeType() == ExchangeNode.Type.REPLICATE;
        }).map((v0) -> {
            return v0.getId();
        }).collect(ImmutableSet.toImmutableSet());
        boolean equals = partitioning.equals(SystemPartitioningHandle.COORDINATOR_DISTRIBUTION);
        if (partitioning.equals(SystemPartitioningHandle.SINGLE_DISTRIBUTION) || equals) {
            ImmutableSet of = ImmutableSet.of();
            if (equals) {
                InternalNode currentNode = this.nodeManager.getCurrentNode();
                Verify.verify(currentNode.isCoordinator(), "current node is expected to be a coordinator", new Object[0]);
                of = ImmutableSet.of(currentNode.getHostAndPort());
            }
            return new SingleDistributionSplitAssigner(of, ImmutableSet.builder().addAll(build).addAll(set).build());
        }
        if (partitioning.equals(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION) || partitioning.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
            return new ArbitraryDistributionSplitAssigner(partitioning.getCatalogHandle(), build, set, j, j2, i);
        }
        if (partitioning.equals(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() || (partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) {
            return new HashDistributionSplitAssigner(partitioning.getCatalogHandle(), build, set, SystemSessionProperties.getFaultTolerantExecutionTargetTaskInputSize(session).toBytes(), map, faultTolerantPartitioningScheme, SystemSessionProperties.getFaultTolerantPreserveInputPartitionsInWriteStage(session) && isWriteFragment(planFragment));
        }
        throw new IllegalArgumentException("Unexpected partitioning: " + partitioning);
    }

    private static boolean isWriteFragment(PlanFragment planFragment) {
        return ((Boolean) planFragment.getRoot().accept(new PlanVisitor<Boolean, Void>() { // from class: io.trino.execution.scheduler.EventDrivenTaskSourceFactory.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.trino.sql.planner.plan.PlanVisitor
            public Boolean visitPlan(PlanNode planNode, Void r6) {
                Iterator<PlanNode> it = planNode.getSources().iterator();
                while (it.hasNext()) {
                    if (((Boolean) it.next().accept(this, r6)).booleanValue()) {
                        return true;
                    }
                }
                return false;
            }

            @Override // io.trino.sql.planner.plan.PlanVisitor
            public Boolean visitTableWriter(TableWriterNode tableWriterNode, Void r4) {
                return true;
            }
        }, null)).booleanValue();
    }
}
