package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.connector.CatalogName;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.Lifespan;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTaskFactory;
import io.trino.execution.SqlStage;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.TaskState;
import io.trino.execution.TestingRemoteTaskFactory;
import io.trino.execution.scheduler.TestingExchange;
import io.trino.execution.scheduler.TestingNodeSelectorFactory;
import io.trino.failuredetector.NoOpFailureDetector;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.operator.RetryPolicy;
import io.trino.operator.StageExecutionDescriptor;
import io.trino.spi.QueryId;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.Partitioning;
import io.trino.sql.planner.PartitioningScheme;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingMetadata;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingSplit;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/execution/scheduler/TestFaultTolerantStageScheduler.class */
public class TestFaultTolerantStageScheduler {
    private static final QueryId QUERY_ID = new QueryId("query");
    private static final Session SESSION = TestingSession.testSessionBuilder().setQueryId(QUERY_ID).build();
    private static final StageId STAGE_ID = new StageId(QUERY_ID, 0);
    private static final PlanFragmentId FRAGMENT_ID = new PlanFragmentId("0");
    private static final PlanFragmentId SOURCE_FRAGMENT_ID_1 = new PlanFragmentId("1");
    private static final PlanFragmentId SOURCE_FRAGMENT_ID_2 = new PlanFragmentId("2");
    private static final PlanNodeId TABLE_SCAN_NODE_ID = new PlanNodeId("table_scan_id");
    private static final CatalogName CATALOG = new CatalogName("catalog");
    private static final InternalNode NODE_1 = new InternalNode("node-1", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, false);
    private static final InternalNode NODE_2 = new InternalNode("node-2", URI.create("local://127.0.0.1:8081"), NodeVersion.UNKNOWN, false);
    private static final InternalNode NODE_3 = new InternalNode("node-3", URI.create("local://127.0.0.1:8082"), NodeVersion.UNKNOWN, false);
    private FinalizerService finalizerService;
    private NodeTaskMap nodeTaskMap;

    @BeforeClass
    public void beforeClass() {
        this.finalizerService = new FinalizerService();
        this.finalizerService.start();
        this.nodeTaskMap = new NodeTaskMap(this.finalizerService);
    }

    @AfterClass(alwaysRun = true)
    public void afterClass() {
        this.nodeTaskMap = null;
        if (this.finalizerService != null) {
            this.finalizerService.destroy();
            this.finalizerService = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        TestingRemoteTaskFactory testingRemoteTaskFactory = new TestingRemoteTaskFactory();
        TaskSourceFactory createTaskSourceFactory = createTaskSourceFactory(5, 2);
        TestingNodeSelectorFactory.TestingNodeSupplier create = TestingNodeSelectorFactory.TestingNodeSupplier.create(ImmutableMap.of(NODE_1, ImmutableList.of(CATALOG), NODE_2, ImmutableList.of(CATALOG), NODE_3, ImmutableList.of(CATALOG)));
        TestingExchange testingExchange = new TestingExchange(false);
        TestingExchange testingExchange2 = new TestingExchange(false);
        TestingExchange testingExchange3 = new TestingExchange(false);
        FaultTolerantStageScheduler createFaultTolerantTaskScheduler = createFaultTolerantTaskScheduler(testingRemoteTaskFactory, createTaskSourceFactory, createNodeAllocator(create), TaskLifecycleListener.NO_OP, Optional.of(testingExchange), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, testingExchange2, SOURCE_FRAGMENT_ID_2, testingExchange3), 2);
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        ListenableFuture isBlocked = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked);
        testingExchange2.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        assertBlocked(isBlocked);
        assertBlocked(createFaultTolerantTaskScheduler.isBlocked());
        testingExchange3.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        assertUnblocked(isBlocked);
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        ListenableFuture isBlocked2 = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked2);
        Assert.assertFalse(testingExchange.isNoMoreSinks());
        Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks = testingRemoteTaskFactory.getTasks();
        Assertions.assertThat(tasks).hasSize(3);
        Assertions.assertThat(tasks).containsKey(getTaskId(0, 0));
        Assertions.assertThat(tasks).containsKey(getTaskId(1, 0));
        Assertions.assertThat(tasks).containsKey(getTaskId(2, 0));
        tasks.get(getTaskId(0, 0)).fail(new RuntimeException("some failure"));
        assertUnblocked(isBlocked2);
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks2 = testingRemoteTaskFactory.getTasks();
        Assertions.assertThat(tasks2).hasSize(4);
        Assertions.assertThat(tasks2).containsKey(getTaskId(3, 0));
        ListenableFuture isBlocked3 = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked3);
        Assertions.assertThat(tasks2).containsKey(getTaskId(1, 0));
        tasks2.get(getTaskId(1, 0)).finish();
        assertUnblocked(isBlocked3);
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        Assertions.assertThat(testingExchange.getFinishedSinkHandles()).contains(new TestingExchange.TestingExchangeSinkHandle[]{new TestingExchange.TestingExchangeSinkHandle(1)});
        createFaultTolerantTaskScheduler.schedule();
        ListenableFuture isBlocked4 = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked4);
        Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks3 = testingRemoteTaskFactory.getTasks();
        Assertions.assertThat(tasks3).hasSize(5);
        Assertions.assertThat(tasks3).containsKey(getTaskId(0, 1));
        Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks4 = testingRemoteTaskFactory.getTasks();
        Assertions.assertThat(tasks4).containsKey(getTaskId(3, 0));
        tasks4.get(getTaskId(3, 0)).finish();
        Assertions.assertThat(testingExchange.getFinishedSinkHandles()).contains(new TestingExchange.TestingExchangeSinkHandle[]{new TestingExchange.TestingExchangeSinkHandle(1), new TestingExchange.TestingExchangeSinkHandle(3)});
        assertUnblocked(isBlocked4);
        createFaultTolerantTaskScheduler.schedule();
        Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks5 = testingRemoteTaskFactory.getTasks();
        Assertions.assertThat(tasks5).hasSize(6);
        Assertions.assertThat(tasks5).containsKey(getTaskId(4, 0));
        Assert.assertFalse(createFaultTolerantTaskScheduler.isFinished());
        ListenableFuture isBlocked5 = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked5);
        Map<TaskId, TestingRemoteTaskFactory.TestingRemoteTask> tasks6 = testingRemoteTaskFactory.getTasks();
        Assertions.assertThat(tasks6).containsKey(getTaskId(4, 0));
        tasks6.get(getTaskId(0, 1)).finish();
        tasks6.get(getTaskId(2, 0)).finish();
        tasks6.get(getTaskId(4, 0)).finish();
        assertUnblocked(isBlocked5);
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        Assertions.assertThat(testingExchange.getFinishedSinkHandles()).contains(new TestingExchange.TestingExchangeSinkHandle[]{new TestingExchange.TestingExchangeSinkHandle(0), new TestingExchange.TestingExchangeSinkHandle(1), new TestingExchange.TestingExchangeSinkHandle(2), new TestingExchange.TestingExchangeSinkHandle(3), new TestingExchange.TestingExchangeSinkHandle(4)});
        Assert.assertTrue(createFaultTolerantTaskScheduler.isFinished());
    }

    @Test
    public void testTaskLifecycleListener() throws Exception {
        TestingRemoteTaskFactory testingRemoteTaskFactory = new TestingRemoteTaskFactory();
        TaskSourceFactory createTaskSourceFactory = createTaskSourceFactory(2, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier create = TestingNodeSelectorFactory.TestingNodeSupplier.create(ImmutableMap.of(NODE_1, ImmutableList.of(CATALOG), NODE_2, ImmutableList.of(CATALOG)));
        TestingTaskLifecycleListener testingTaskLifecycleListener = new TestingTaskLifecycleListener();
        TestingExchange testingExchange = new TestingExchange(false);
        TestingExchange testingExchange2 = new TestingExchange(false);
        FaultTolerantStageScheduler createFaultTolerantTaskScheduler = createFaultTolerantTaskScheduler(testingRemoteTaskFactory, createTaskSourceFactory, createNodeAllocator(create), testingTaskLifecycleListener, Optional.empty(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, testingExchange, SOURCE_FRAGMENT_ID_2, testingExchange2), 2);
        testingExchange.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        testingExchange2.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        assertBlocked(createFaultTolerantTaskScheduler.isBlocked());
        Assertions.assertThat(testingTaskLifecycleListener.getTasks().get(FRAGMENT_ID)).contains(new TaskId[]{getTaskId(0, 0), getTaskId(1, 0)});
        testingRemoteTaskFactory.getTasks().get(getTaskId(0, 0)).fail(new RuntimeException("some exception"));
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        assertBlocked(createFaultTolerantTaskScheduler.isBlocked());
        Assertions.assertThat(testingTaskLifecycleListener.getTasks().get(FRAGMENT_ID)).contains(new TaskId[]{getTaskId(0, 0), getTaskId(1, 0), getTaskId(0, 1)});
    }

    @Test
    public void testTaskFailure() throws Exception {
        TestingRemoteTaskFactory testingRemoteTaskFactory = new TestingRemoteTaskFactory();
        TaskSourceFactory createTaskSourceFactory = createTaskSourceFactory(3, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier create = TestingNodeSelectorFactory.TestingNodeSupplier.create(ImmutableMap.of(NODE_1, ImmutableList.of(CATALOG), NODE_2, ImmutableList.of(CATALOG)));
        TestingExchange testingExchange = new TestingExchange(false);
        TestingExchange testingExchange2 = new TestingExchange(false);
        NodeAllocator createNodeAllocator = createNodeAllocator(create);
        FaultTolerantStageScheduler createFaultTolerantTaskScheduler = createFaultTolerantTaskScheduler(testingRemoteTaskFactory, createTaskSourceFactory, createNodeAllocator, TaskLifecycleListener.NO_OP, Optional.empty(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, testingExchange, SOURCE_FRAGMENT_ID_2, testingExchange2), 0);
        testingExchange.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        testingExchange2.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        ListenableFuture isBlocked = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked);
        ListenableFuture acquire = createNodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
        ListenableFuture acquire2 = createNodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
        testingRemoteTaskFactory.getTasks().get(getTaskId(0, 0)).fail(new RuntimeException("some failure"));
        assertUnblocked(isBlocked);
        assertUnblocked(acquire);
        assertUnblocked(acquire2);
        Assert.assertTrue(acquire.isDone());
        Assert.assertTrue(acquire2.isDone());
        Objects.requireNonNull(createFaultTolerantTaskScheduler);
        Assertions.assertThatThrownBy(createFaultTolerantTaskScheduler::schedule).hasMessageContaining("some failure");
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        Assert.assertFalse(createFaultTolerantTaskScheduler.isFinished());
    }

    @Test
    public void testReportTaskFailure() throws Exception {
        TestingRemoteTaskFactory testingRemoteTaskFactory = new TestingRemoteTaskFactory();
        TaskSourceFactory createTaskSourceFactory = createTaskSourceFactory(2, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier create = TestingNodeSelectorFactory.TestingNodeSupplier.create(ImmutableMap.of(NODE_1, ImmutableList.of(CATALOG), NODE_2, ImmutableList.of(CATALOG)));
        TestingExchange testingExchange = new TestingExchange(false);
        TestingExchange testingExchange2 = new TestingExchange(false);
        FaultTolerantStageScheduler createFaultTolerantTaskScheduler = createFaultTolerantTaskScheduler(testingRemoteTaskFactory, createTaskSourceFactory, createNodeAllocator(create), TaskLifecycleListener.NO_OP, Optional.empty(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, testingExchange, SOURCE_FRAGMENT_ID_2, testingExchange2), 1);
        testingExchange.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        testingExchange2.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        ListenableFuture isBlocked = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked);
        createFaultTolerantTaskScheduler.reportTaskFailure(getTaskId(0, 0), new RuntimeException("some failure"));
        Assert.assertEquals(testingRemoteTaskFactory.getTasks().get(getTaskId(0, 0)).getTaskStatus().getState(), TaskState.FAILED);
        assertUnblocked(isBlocked);
        createFaultTolerantTaskScheduler.schedule();
        Assertions.assertThat(testingRemoteTaskFactory.getTasks()).containsKey(getTaskId(0, 1));
        testingRemoteTaskFactory.getTasks().get(getTaskId(0, 1)).finish();
        testingRemoteTaskFactory.getTasks().get(getTaskId(1, 0)).finish();
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        Assert.assertTrue(createFaultTolerantTaskScheduler.isFinished());
    }

    @Test
    public void testCancellation() throws Exception {
        testCancellation(true);
        testCancellation(false);
    }

    private void testCancellation(boolean z) throws Exception {
        TestingRemoteTaskFactory testingRemoteTaskFactory = new TestingRemoteTaskFactory();
        TestingTaskSourceFactory createTaskSourceFactory = createTaskSourceFactory(3, 1);
        TestingNodeSelectorFactory.TestingNodeSupplier create = TestingNodeSelectorFactory.TestingNodeSupplier.create(ImmutableMap.of(NODE_1, ImmutableList.of(CATALOG), NODE_2, ImmutableList.of(CATALOG)));
        TestingExchange testingExchange = new TestingExchange(false);
        TestingExchange testingExchange2 = new TestingExchange(false);
        NodeAllocator createNodeAllocator = createNodeAllocator(create);
        FaultTolerantStageScheduler createFaultTolerantTaskScheduler = createFaultTolerantTaskScheduler(testingRemoteTaskFactory, createTaskSourceFactory, createNodeAllocator, TaskLifecycleListener.NO_OP, Optional.empty(), ImmutableMap.of(SOURCE_FRAGMENT_ID_1, testingExchange, SOURCE_FRAGMENT_ID_2, testingExchange2), 0);
        testingExchange.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        testingExchange2.setSourceHandles(ImmutableList.of(new TestingExchange.TestingExchangeSourceHandle(0, 1L)));
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        createFaultTolerantTaskScheduler.schedule();
        ListenableFuture isBlocked = createFaultTolerantTaskScheduler.isBlocked();
        assertBlocked(isBlocked);
        ListenableFuture acquire = createNodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
        ListenableFuture acquire2 = createNodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
        if (z) {
            createFaultTolerantTaskScheduler.abort();
        } else {
            createFaultTolerantTaskScheduler.cancel();
        }
        assertUnblocked(isBlocked);
        assertUnblocked(acquire);
        assertUnblocked(acquire2);
        createFaultTolerantTaskScheduler.schedule();
        assertUnblocked(createFaultTolerantTaskScheduler.isBlocked());
        Assert.assertFalse(createFaultTolerantTaskScheduler.isFinished());
    }

    private FaultTolerantStageScheduler createFaultTolerantTaskScheduler(RemoteTaskFactory remoteTaskFactory, TaskSourceFactory taskSourceFactory, NodeAllocator nodeAllocator, TaskLifecycleListener taskLifecycleListener, Optional<Exchange> optional, Map<PlanFragmentId, Exchange> map, int i) {
        TaskDescriptorStorage taskDescriptorStorage = new TaskDescriptorStorage(DataSize.of(10L, DataSize.Unit.MEGABYTE));
        taskDescriptorStorage.initialize(SESSION.getQueryId());
        return new FaultTolerantStageScheduler(SESSION, createSqlStage(remoteTaskFactory), new NoOpFailureDetector(), taskSourceFactory, nodeAllocator, taskDescriptorStorage, taskLifecycleListener, optional, Optional.empty(), map, Optional.empty(), Optional.empty(), i);
    }

    private SqlStage createSqlStage(RemoteTaskFactory remoteTaskFactory) {
        return SqlStage.createSqlStage(STAGE_ID, createPlanFragment(), ImmutableMap.of(), remoteTaskFactory, SESSION, false, this.nodeTaskMap, MoreExecutors.directExecutor(), new SplitSchedulerStats());
    }

    private PlanFragment createPlanFragment() {
        Symbol symbol = new Symbol("probe_column");
        Symbol symbol2 = new Symbol("build_column");
        TableScanNode tableScanNode = new TableScanNode(TABLE_SCAN_NODE_ID, TestingHandles.TEST_TABLE_HANDLE, ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")), TupleDomain.none(), Optional.empty(), false, Optional.empty());
        RemoteSourceNode remoteSourceNode = new RemoteSourceNode(new PlanNodeId("remote_source_id"), ImmutableList.of(SOURCE_FRAGMENT_ID_1, SOURCE_FRAGMENT_ID_2), ImmutableList.of(symbol2), Optional.empty(), ExchangeNode.Type.REPLICATE, RetryPolicy.TASK);
        return new PlanFragment(FRAGMENT_ID, new JoinNode(new PlanNodeId("join_id"), JoinNode.Type.INNER, tableScanNode, remoteSourceNode, ImmutableList.of(), tableScanNode.getOutputSymbols(), remoteSourceNode.getOutputSymbols(), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(JoinNode.DistributionType.REPLICATED), Optional.empty(), ImmutableMap.of(), Optional.empty()), ImmutableMap.of(symbol, VarcharType.VARCHAR, symbol2, VarcharType.VARCHAR), SystemPartitioningHandle.SOURCE_DISTRIBUTION, ImmutableList.of(TABLE_SCAN_NODE_ID), new PartitioningScheme(Partitioning.create(SystemPartitioningHandle.SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol, symbol2)), StageExecutionDescriptor.ungroupedExecution(), StatsAndCosts.empty(), Optional.empty());
    }

    private static TestingTaskSourceFactory createTaskSourceFactory(int i, int i2) {
        return new TestingTaskSourceFactory(Optional.of(CATALOG), createSplits(i), i2);
    }

    private static List<Split> createSplits(int i) {
        return ImmutableList.copyOf(Iterables.limit(Iterables.cycle(new Split[]{new Split(CATALOG, TestingSplit.createRemoteSplit(), Lifespan.taskWide())}), i));
    }

    private NodeAllocator createNodeAllocator(TestingNodeSelectorFactory.TestingNodeSupplier testingNodeSupplier) {
        return new FixedCountNodeAllocator(new NodeScheduler(new TestingNodeSelectorFactory(NODE_1, testingNodeSupplier)), SESSION, 1);
    }

    private static TaskId getTaskId(int i, int i2) {
        return new TaskId(STAGE_ID, i, i2);
    }

    private static void assertBlocked(ListenableFuture<?> listenableFuture) {
        Assert.assertFalse(listenableFuture.isDone());
    }

    private static void assertUnblocked(ListenableFuture<?> listenableFuture) {
        Assert.assertTrue(listenableFuture.isDone());
    }
}
