package io.trino.execution.scheduler;

import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.SizeOf;
import io.airlift.units.DataSize;
import io.trino.client.NodeVersion;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.StageTaskSourceFactory;
import io.trino.execution.scheduler.TestingExchange;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.QueryId;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingHandles;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import org.assertj.core.api.Assertions;
import org.openjdk.jol.info.ClassLayout;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/execution/scheduler/TestStageTaskSourceFactory.class */
public class TestStageTaskSourceFactory {
    private static final HostAddress NODE_ADDRESS = HostAddress.fromString("testaddress");
    private static final PlanNodeId PLAN_NODE_1 = new PlanNodeId("planNode1");
    private static final PlanNodeId PLAN_NODE_2 = new PlanNodeId("planNode2");
    private static final PlanNodeId PLAN_NODE_3 = new PlanNodeId("planNode3");
    private static final PlanNodeId PLAN_NODE_4 = new PlanNodeId("planNode4");
    private static final PlanNodeId PLAN_NODE_5 = new PlanNodeId("planNode5");
    public static final long STANDARD_WEIGHT = SplitWeight.standard().getRawValue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/TestStageTaskSourceFactory$TestingConnectorSplit.class */
    public static class TestingConnectorSplit implements ConnectorSplit {
        private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestingConnectorSplit.class).instanceSize();
        private final int id;
        private final OptionalInt bucket;
        private final Optional<List<HostAddress>> addresses;
        private final SplitWeight weight;

        public TestingConnectorSplit(int i, OptionalInt optionalInt, Optional<List<HostAddress>> optional) {
            this(i, optionalInt, optional, SplitWeight.standard().getRawValue());
        }

        public TestingConnectorSplit(int i, OptionalInt optionalInt, Optional<List<HostAddress>> optional, long j) {
            this.id = i;
            this.bucket = (OptionalInt) Objects.requireNonNull(optionalInt, "bucket is null");
            this.addresses = optional.map((v0) -> {
                return ImmutableList.copyOf(v0);
            });
            this.weight = SplitWeight.fromRawValue(j);
        }

        public int getId() {
            return this.id;
        }

        public OptionalInt getBucket() {
            return this.bucket;
        }

        public boolean isRemotelyAccessible() {
            return this.addresses.isEmpty();
        }

        public List<HostAddress> getAddresses() {
            return this.addresses.orElse(ImmutableList.of());
        }

        public SplitWeight getSplitWeight() {
            return this.weight;
        }

        public Object getInfo() {
            return null;
        }

        public long getRetainedSizeInBytes() {
            return INSTANCE_SIZE + SizeOf.sizeOf(this.bucket) + SizeOf.sizeOf(this.addresses, list -> {
                return SizeOf.estimatedSizeOf(list, (v0) -> {
                    return v0.getRetainedSizeInBytes();
                });
            });
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TestingConnectorSplit testingConnectorSplit = (TestingConnectorSplit) obj;
            return this.id == testingConnectorSplit.id && this.weight == testingConnectorSplit.weight && Objects.equals(this.bucket, testingConnectorSplit.bucket) && Objects.equals(this.addresses, testingConnectorSplit.addresses);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.id), this.bucket, this.addresses, this.weight);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("id", this.id).add("bucket", this.bucket).add("addresses", this.addresses).add("weight", this.weight).toString();
        }
    }

    @Test
    public void testSingleDistributionTaskSource() {
        ImmutableListMultimap build = ImmutableListMultimap.builder().put(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 123L)).put(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 321L)).put(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 222L)).build();
        StageTaskSourceFactory.SingleDistributionTaskSource singleDistributionTaskSource = new StageTaskSourceFactory.SingleDistributionTaskSource(build, DataSize.of(4L, DataSize.Unit.GIGABYTE), new InMemoryNodeManager(new InternalNode[0]), false);
        Assert.assertFalse(singleDistributionTaskSource.isFinished());
        List list = (List) MoreFutures.getFutureValue(singleDistributionTaskSource.getMoreTasks());
        Assertions.assertThat(list).hasSize(1);
        Assert.assertTrue(singleDistributionTaskSource.isFinished());
        TaskDescriptor taskDescriptor = (TaskDescriptor) list.get(0);
        Assertions.assertThat(taskDescriptor.getNodeRequirements().getCatalogHandle()).isEmpty();
        Assertions.assertThat(taskDescriptor.getNodeRequirements().getAddresses()).isEmpty();
        Assert.assertEquals(taskDescriptor.getNodeRequirements().getMemory(), DataSize.of(4L, DataSize.Unit.GIGABYTE));
        Assert.assertEquals(taskDescriptor.getPartitionId(), 0);
        Assert.assertEquals(taskDescriptor.getExchangeSourceHandles(), build);
        Assert.assertEquals(taskDescriptor.getSplits(), ImmutableListMultimap.of());
    }

    @Test
    public void testCoordinatorDistributionTaskSource() {
        ImmutableListMultimap build = ImmutableListMultimap.builder().put(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 123L)).put(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 321L)).put(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 222L)).build();
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[0]);
        StageTaskSourceFactory.SingleDistributionTaskSource singleDistributionTaskSource = new StageTaskSourceFactory.SingleDistributionTaskSource(build, DataSize.of(4L, DataSize.Unit.GIGABYTE), inMemoryNodeManager, true);
        Assert.assertFalse(singleDistributionTaskSource.isFinished());
        List list = (List) MoreFutures.getFutureValue(singleDistributionTaskSource.getMoreTasks());
        Assertions.assertThat(list).hasSize(1);
        Assert.assertTrue(singleDistributionTaskSource.isFinished());
        TaskDescriptor taskDescriptor = (TaskDescriptor) list.get(0);
        Assertions.assertThat(taskDescriptor.getNodeRequirements().getCatalogHandle()).isEmpty();
        Assertions.assertThat(taskDescriptor.getNodeRequirements().getAddresses()).containsExactly(new HostAddress[]{inMemoryNodeManager.getCurrentNode().getHostAndPort()});
        Assert.assertEquals(taskDescriptor.getNodeRequirements().getMemory(), DataSize.of(4L, DataSize.Unit.GIGABYTE));
        Assert.assertEquals(taskDescriptor.getPartitionId(), 0);
        Assert.assertEquals(taskDescriptor.getExchangeSourceHandles(), build);
        Assert.assertEquals(taskDescriptor.getSplits(), ImmutableListMultimap.of());
    }

    @Test
    public void testArbitraryDistributionTaskSource() {
        StageTaskSourceFactory.ArbitraryDistributionTaskSource arbitraryDistributionTaskSource = new StageTaskSourceFactory.ArbitraryDistributionTaskSource(ImmutableListMultimap.of(), ImmutableListMultimap.of(), DataSize.of(3L, DataSize.Unit.BYTE), DataSize.of(4L, DataSize.Unit.GIGABYTE));
        Assert.assertFalse(arbitraryDistributionTaskSource.isFinished());
        Assertions.assertThat((List) MoreFutures.getFutureValue(arbitraryDistributionTaskSource.getMoreTasks())).isEmpty();
        Assert.assertTrue(arbitraryDistributionTaskSource.isFinished());
        TestingExchange.TestingExchangeSourceHandle testingExchangeSourceHandle = new TestingExchange.TestingExchangeSourceHandle(0, 1L);
        TestingExchange.TestingExchangeSourceHandle testingExchangeSourceHandle2 = new TestingExchange.TestingExchangeSourceHandle(0, 2L);
        TestingExchange.TestingExchangeSourceHandle testingExchangeSourceHandle3 = new TestingExchange.TestingExchangeSourceHandle(0, 3L);
        TestingExchange.TestingExchangeSourceHandle testingExchangeSourceHandle4 = new TestingExchange.TestingExchangeSourceHandle(0, 4L);
        TestingExchange.TestingExchangeSourceHandle testingExchangeSourceHandle5 = new TestingExchange.TestingExchangeSourceHandle(0, 123L);
        TestingExchange.TestingExchangeSourceHandle testingExchangeSourceHandle6 = new TestingExchange.TestingExchangeSourceHandle(0, 321L);
        StageTaskSourceFactory.ArbitraryDistributionTaskSource arbitraryDistributionTaskSource2 = new StageTaskSourceFactory.ArbitraryDistributionTaskSource(ImmutableListMultimap.of(PLAN_NODE_1, testingExchangeSourceHandle3), ImmutableListMultimap.of(), DataSize.of(3L, DataSize.Unit.BYTE), DataSize.of(4L, DataSize.Unit.GIGABYTE));
        List list = (List) MoreFutures.getFutureValue(arbitraryDistributionTaskSource2.getMoreTasks());
        Assert.assertTrue(arbitraryDistributionTaskSource2.isFinished());
        Assertions.assertThat(list).hasSize(1);
        Assert.assertEquals(list, ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 3L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertEquals((List) MoreFutures.getFutureValue(new StageTaskSourceFactory.ArbitraryDistributionTaskSource(ImmutableListMultimap.of(PLAN_NODE_1, testingExchangeSourceHandle5), ImmutableListMultimap.of(), DataSize.of(3L, DataSize.Unit.BYTE), DataSize.of(4L, DataSize.Unit.GIGABYTE)).getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 123L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertEquals((List) MoreFutures.getFutureValue(new StageTaskSourceFactory.ArbitraryDistributionTaskSource(ImmutableListMultimap.of(PLAN_NODE_1, testingExchangeSourceHandle5, PLAN_NODE_2, testingExchangeSourceHandle6), ImmutableListMultimap.of(), DataSize.of(3L, DataSize.Unit.BYTE), DataSize.of(4L, DataSize.Unit.GIGABYTE)).getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 123L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 321L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertEquals((List) MoreFutures.getFutureValue(new StageTaskSourceFactory.ArbitraryDistributionTaskSource(ImmutableListMultimap.of(PLAN_NODE_1, testingExchangeSourceHandle, PLAN_NODE_1, testingExchangeSourceHandle2, PLAN_NODE_2, testingExchangeSourceHandle4), ImmutableListMultimap.of(), DataSize.of(3L, DataSize.Unit.BYTE), DataSize.of(4L, DataSize.Unit.GIGABYTE)).getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 2L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 4L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertEquals((List) MoreFutures.getFutureValue(new StageTaskSourceFactory.ArbitraryDistributionTaskSource(ImmutableListMultimap.of(PLAN_NODE_1, testingExchangeSourceHandle, PLAN_NODE_1, testingExchangeSourceHandle3, PLAN_NODE_2, testingExchangeSourceHandle4), ImmutableListMultimap.of(), DataSize.of(3L, DataSize.Unit.BYTE), DataSize.of(4L, DataSize.Unit.GIGABYTE)).getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 3L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(2, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 4L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertEquals((List) MoreFutures.getFutureValue(new StageTaskSourceFactory.ArbitraryDistributionTaskSource(ImmutableListMultimap.of(PLAN_NODE_1, testingExchangeSourceHandle, PLAN_NODE_1, testingExchangeSourceHandle2, PLAN_NODE_1, testingExchangeSourceHandle4), ImmutableListMultimap.of(PLAN_NODE_2, testingExchangeSourceHandle6), DataSize.of(3L, DataSize.Unit.BYTE), DataSize.of(4L, DataSize.Unit.GIGABYTE)).getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 2L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 321L)), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 4L), PLAN_NODE_2, testingExchangeSourceHandle6), new NodeRequirements(Optional.empty(), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
    }

    @Test
    public void testHashDistributionTaskSource() {
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource = createHashDistributionTaskSource(ImmutableMap.of(), ImmutableListMultimap.of(), ImmutableListMultimap.of(), 1, new int[]{0, 1, 2, 3}, Optional.empty(), 0L, DataSize.of(3L, DataSize.Unit.BYTE));
        Assert.assertFalse(createHashDistributionTaskSource.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createHashDistributionTaskSource.getMoreTasks()), ImmutableList.of());
        Assert.assertTrue(createHashDistributionTaskSource.isFinished());
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource2 = createHashDistributionTaskSource(ImmutableMap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 1L)), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), 1, new int[]{0, 1, 2, 3}, Optional.empty(), 0L, DataSize.of(0L, DataSize.Unit.BYTE));
        Assert.assertFalse(createHashDistributionTaskSource2.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createHashDistributionTaskSource2.getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(2, ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertTrue(createHashDistributionTaskSource2.isFinished());
        Split createBucketedSplit = createBucketedSplit(0, 0);
        Split createBucketedSplit2 = createBucketedSplit(0, 2);
        Split createBucketedSplit3 = createBucketedSplit(0, 3);
        Split createBucketedSplit4 = createBucketedSplit(0, 1);
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource3 = createHashDistributionTaskSource(ImmutableMap.of(PLAN_NODE_4, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit, createBucketedSplit2, createBucketedSplit3)), PLAN_NODE_5, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit4))), ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), 1, new int[]{0, 1, 2, 3}, Optional.of(getTestingBucketNodeMap(4)), 0L, DataSize.of(0L, DataSize.Unit.BYTE));
        Assert.assertFalse(createHashDistributionTaskSource3.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createHashDistributionTaskSource3.getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(PLAN_NODE_5, createBucketedSplit4), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(2, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit2), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(3, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit3), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertTrue(createHashDistributionTaskSource3.isFinished());
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource4 = createHashDistributionTaskSource(ImmutableMap.of(PLAN_NODE_4, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit, createBucketedSplit2, createBucketedSplit3)), PLAN_NODE_5, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit4))), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 1L)), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), 1, new int[]{0, 1, 2, 3}, Optional.of(getTestingBucketNodeMap(4)), 0L, DataSize.of(0L, DataSize.Unit.BYTE));
        Assert.assertFalse(createHashDistributionTaskSource4.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createHashDistributionTaskSource4.getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(PLAN_NODE_5, createBucketedSplit4), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(2, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit2), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(3, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit3), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertTrue(createHashDistributionTaskSource4.isFinished());
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource5 = createHashDistributionTaskSource(ImmutableMap.of(PLAN_NODE_4, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit, createBucketedSplit2, createBucketedSplit3)), PLAN_NODE_5, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit4))), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), 2, new int[]{0, 1, 0, 1}, Optional.of(getTestingBucketNodeMap(4)), 0L, DataSize.of(0L, DataSize.Unit.BYTE));
        Assert.assertFalse(createHashDistributionTaskSource5.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createHashDistributionTaskSource5.getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit, PLAN_NODE_4, createBucketedSplit2), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit3, PLAN_NODE_5, createBucketedSplit4), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertTrue(createHashDistributionTaskSource5.isFinished());
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource6 = createHashDistributionTaskSource(ImmutableMap.of(PLAN_NODE_4, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit, createBucketedSplit2, createBucketedSplit3)), PLAN_NODE_5, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit4))), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(2, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 1L)), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(17, 1L)), 2, new int[]{0, 1, 2, 3}, Optional.of(getTestingBucketNodeMap(4)), 2 * STANDARD_WEIGHT, DataSize.of(100L, DataSize.Unit.GIGABYTE));
        Assert.assertFalse(createHashDistributionTaskSource6.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createHashDistributionTaskSource6.getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit, PLAN_NODE_5, createBucketedSplit4), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 1L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(1, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(17, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit2, PLAN_NODE_4, createBucketedSplit3), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(2, 1L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 1L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(17, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertTrue(createHashDistributionTaskSource6.isFinished());
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource7 = createHashDistributionTaskSource(ImmutableMap.of(PLAN_NODE_4, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit, createBucketedSplit2, createBucketedSplit3)), PLAN_NODE_5, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, ImmutableList.of(createBucketedSplit4))), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 20L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 30L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(1, 20L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(2, 99L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 30L)), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(17, 1L)), 2, new int[]{0, 1, 2, 3}, Optional.of(getTestingBucketNodeMap(4)), 100 * STANDARD_WEIGHT, DataSize.of(100L, DataSize.Unit.BYTE));
        Assert.assertFalse(createHashDistributionTaskSource7.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createHashDistributionTaskSource7.getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit, PLAN_NODE_5, createBucketedSplit4), ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(0, 20L), PLAN_NODE_1, new TestingExchange.TestingExchangeSourceHandle(1, 30L), PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(1, 20L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(17, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(1, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit2), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(2, 99L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(17, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE))), new TaskDescriptor(2, ImmutableListMultimap.of(PLAN_NODE_4, createBucketedSplit3), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(3, 30L), PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(17, 1L)), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertTrue(createHashDistributionTaskSource7.isFinished());
    }

    private static StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource(Map<PlanNodeId, SplitSource> map, Multimap<PlanNodeId, ExchangeSourceHandle> multimap, Multimap<PlanNodeId, ExchangeSourceHandle> multimap2, int i, int[] iArr, Optional<BucketNodeMap> optional, long j, DataSize dataSize) {
        return new StageTaskSourceFactory.HashDistributionTaskSource(map, multimap, multimap2, i, j2 -> {
        }, iArr, optional, Optional.of(TestingHandles.TEST_CATALOG_HANDLE), j, dataSize, DataSize.of(4L, DataSize.Unit.GIGABYTE), MoreExecutors.directExecutor());
    }

    @Test
    public void testSourceDistributionTaskSource() {
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 2, 0, 3 * STANDARD_WEIGHT, 1000);
        Assert.assertFalse(createSourceDistributionTaskSource.isFinished());
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createSourceDistributionTaskSource.getMoreTasks()), ImmutableList.of());
        Assert.assertTrue(createSourceDistributionTaskSource.isFinished());
        Split createSplit = createSplit(1, new String[0]);
        Split createSplit2 = createSplit(2, new String[0]);
        Split createSplit3 = createSplit(3, new String[0]);
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource2 = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createSplit), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 2, 0, 2 * STANDARD_WEIGHT, 1000);
        Assert.assertEquals((Collection) MoreFutures.getFutureValue(createSourceDistributionTaskSource2.getMoreTasks()), ImmutableList.of(new TaskDescriptor(0, ImmutableListMultimap.of(PLAN_NODE_1, createSplit), ImmutableListMultimap.of(), new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)))));
        Assert.assertTrue(createSourceDistributionTaskSource2.isFinished());
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource3 = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createSplit, createSplit2, createSplit3), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 3, 0, 2 * STANDARD_WEIGHT, 1000);
        List<TaskDescriptor> readAllTasks = readAllTasks(createSourceDistributionTaskSource3);
        Assertions.assertThat(readAllTasks).hasSize(2);
        Assertions.assertThat(readAllTasks.get(0).getSplits().values()).hasSize(2);
        Assertions.assertThat(readAllTasks.get(1).getSplits().values()).hasSize(1);
        Assertions.assertThat(readAllTasks).allMatch(taskDescriptor -> {
            return taskDescriptor.getNodeRequirements().equals(new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)));
        });
        Assertions.assertThat(readAllTasks).allMatch(taskDescriptor2 -> {
            return taskDescriptor2.getExchangeSourceHandles().isEmpty();
        });
        org.assertj.guava.api.Assertions.assertThat(flattenSplits(readAllTasks)).hasSameEntriesAs(ImmutableMultimap.of(PLAN_NODE_1, createSplit, PLAN_NODE_1, createSplit2, PLAN_NODE_1, createSplit3));
        Assert.assertTrue(createSourceDistributionTaskSource3.isFinished());
        ImmutableListMultimap of = ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchange.TestingExchangeSourceHandle(0, 1L));
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource4 = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createSplit, createSplit2, createSplit3), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) of, 2, 0, 2 * STANDARD_WEIGHT, 1000);
        List<TaskDescriptor> readAllTasks2 = readAllTasks(createSourceDistributionTaskSource4);
        Assertions.assertThat(readAllTasks2).hasSize(2);
        Assertions.assertThat(readAllTasks2.get(0).getSplits().values()).hasSize(2);
        Assertions.assertThat(readAllTasks2.get(1).getSplits().values()).hasSize(1);
        Assertions.assertThat(readAllTasks2).allMatch(taskDescriptor3 -> {
            return taskDescriptor3.getNodeRequirements().equals(new NodeRequirements(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4L, DataSize.Unit.GIGABYTE)));
        });
        Assertions.assertThat(readAllTasks2).allMatch(taskDescriptor4 -> {
            return taskDescriptor4.getExchangeSourceHandles().equals(of);
        });
        org.assertj.guava.api.Assertions.assertThat(flattenSplits(readAllTasks2)).hasSameEntriesAs(ImmutableMultimap.of(PLAN_NODE_1, createSplit, PLAN_NODE_1, createSplit2, PLAN_NODE_1, createSplit3));
        Assert.assertTrue(createSourceDistributionTaskSource4.isFinished());
        ImmutableList of2 = ImmutableList.of(createSplit(1, "host1:8080", "host2:8080"), createSplit(2, "host2:8080"), createSplit(3, "host1:8080", "host3:8080"), createSplit(4, "host3:8080", "host1:8080"), createSplit(5, "host1:8080", "host2:8080"), createSplit(6, "host2:8080", "host3:8080"), createSplit(7, "host3:8080", "host4:8080"));
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource5 = createSourceDistributionTaskSource((List<Split>) of2, (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 3, 0, 2 * STANDARD_WEIGHT, 1000);
        List<TaskDescriptor> readAllTasks3 = readAllTasks(createSourceDistributionTaskSource5);
        Assertions.assertThat(readAllTasks3).hasSize(4);
        Assertions.assertThat(readAllTasks3.stream()).allMatch(taskDescriptor5 -> {
            return taskDescriptor5.getExchangeSourceHandles().isEmpty();
        });
        org.assertj.guava.api.Assertions.assertThat(flattenSplits(readAllTasks3)).hasSameEntriesAs(Multimaps.index(of2, split -> {
            return PLAN_NODE_1;
        }));
        Assertions.assertThat(readAllTasks3).allMatch(taskDescriptor6 -> {
            return taskDescriptor6.getSplits().values().stream().allMatch(split2 -> {
                return split2.getAddresses().contains((HostAddress) Iterables.getOnlyElement(taskDescriptor6.getNodeRequirements().getAddresses()));
            });
        });
        Assert.assertTrue(createSourceDistributionTaskSource5.isFinished());
    }

    @Test
    public void testSourceDistributionTaskSourceWithWeights() {
        Split createWeightedSplit = createWeightedSplit(1, STANDARD_WEIGHT, new String[0]);
        long j = 2 * STANDARD_WEIGHT;
        Split createWeightedSplit2 = createWeightedSplit(11, j, new String[0]);
        Split createWeightedSplit3 = createWeightedSplit(12, j, new String[0]);
        Split createWeightedSplit4 = createWeightedSplit(13, j, new String[0]);
        long j2 = (long) (0.5d * STANDARD_WEIGHT);
        Split createWeightedSplit5 = createWeightedSplit(21, j2, new String[0]);
        Split createWeightedSplit6 = createWeightedSplit(22, j2, new String[0]);
        Split createWeightedSplit7 = createWeightedSplit(23, j2, new String[0]);
        Split createWeightedSplit8 = createWeightedSplit(24, j2, new String[0]);
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createWeightedSplit5, createWeightedSplit6, createWeightedSplit, createWeightedSplit2, createWeightedSplit3, createWeightedSplit8), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 1, 0, (long) (1.9d * STANDARD_WEIGHT), 1000);
        List<TaskDescriptor> readAllTasks = readAllTasks(createSourceDistributionTaskSource);
        Assertions.assertThat(readAllTasks).hasSize(4);
        Assertions.assertThat(readAllTasks).allMatch(taskDescriptor -> {
            return ((PlanNodeId) Iterables.getOnlyElement(taskDescriptor.getSplits().keySet())).equals(PLAN_NODE_1);
        });
        Assertions.assertThat(readAllTasks.get(0).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit5, createWeightedSplit6, createWeightedSplit});
        Assertions.assertThat(readAllTasks.get(1).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit2});
        Assertions.assertThat(readAllTasks.get(2).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit3});
        Assertions.assertThat(readAllTasks.get(3).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit8});
        Assert.assertTrue(createSourceDistributionTaskSource.isFinished());
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource2 = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createWeightedSplit2, createWeightedSplit3, createWeightedSplit4, createWeightedSplit5, createWeightedSplit6, createWeightedSplit7, createWeightedSplit8), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 1, 2, 2 * STANDARD_WEIGHT, 1000);
        List<TaskDescriptor> readAllTasks2 = readAllTasks(createSourceDistributionTaskSource2);
        Assertions.assertThat(readAllTasks2).hasSize(3);
        Assertions.assertThat(readAllTasks2).allMatch(taskDescriptor2 -> {
            return ((PlanNodeId) Iterables.getOnlyElement(taskDescriptor2.getSplits().keySet())).equals(PLAN_NODE_1);
        });
        Assertions.assertThat(readAllTasks2.get(0).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit2, createWeightedSplit3});
        Assertions.assertThat(readAllTasks2.get(1).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit4, createWeightedSplit5});
        Assertions.assertThat(readAllTasks2.get(2).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit6, createWeightedSplit7, createWeightedSplit8});
        Assert.assertTrue(createSourceDistributionTaskSource2.isFinished());
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource3 = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createWeightedSplit5, createWeightedSplit6, createWeightedSplit7, createWeightedSplit2, createWeightedSplit8), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 1, 0, 2 * STANDARD_WEIGHT, 3);
        List<TaskDescriptor> readAllTasks3 = readAllTasks(createSourceDistributionTaskSource3);
        Assertions.assertThat(readAllTasks3).hasSize(3);
        Assertions.assertThat(readAllTasks3).allMatch(taskDescriptor3 -> {
            return ((PlanNodeId) Iterables.getOnlyElement(taskDescriptor3.getSplits().keySet())).equals(PLAN_NODE_1);
        });
        Assertions.assertThat(readAllTasks3.get(0).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit5, createWeightedSplit6, createWeightedSplit7});
        Assertions.assertThat(readAllTasks3.get(1).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit2});
        Assertions.assertThat(readAllTasks3.get(2).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit8});
        Assert.assertTrue(createSourceDistributionTaskSource3.isFinished());
        Split createWeightedSplit9 = createWeightedSplit(1, STANDARD_WEIGHT, "host1:8080");
        Split createWeightedSplit10 = createWeightedSplit(2, STANDARD_WEIGHT, "host2:8080");
        Split createWeightedSplit11 = createWeightedSplit(3, STANDARD_WEIGHT, "host1:8080");
        Split createWeightedSplit12 = createWeightedSplit(3, STANDARD_WEIGHT, "host1:8080", "host2:8080");
        Split createWeightedSplit13 = createWeightedSplit(12, j, "host2:8080");
        Split createWeightedSplit14 = createWeightedSplit(21, j2, "host1:8080");
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource4 = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createWeightedSplit9, createWeightedSplit13, createWeightedSplit11, createWeightedSplit14), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 1, 0, 2 * STANDARD_WEIGHT, 3);
        List<TaskDescriptor> readAllTasks4 = readAllTasks(createSourceDistributionTaskSource4);
        Assertions.assertThat(readAllTasks4).hasSize(3);
        Assertions.assertThat(readAllTasks4).allMatch(taskDescriptor4 -> {
            return ((PlanNodeId) Iterables.getOnlyElement(taskDescriptor4.getSplits().keySet())).equals(PLAN_NODE_1);
        });
        Assertions.assertThat(readAllTasks4.get(0).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit13});
        Assertions.assertThat(readAllTasks4.get(1).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit9, createWeightedSplit11});
        Assertions.assertThat(readAllTasks4.get(2).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit14});
        Assert.assertTrue(createSourceDistributionTaskSource4.isFinished());
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource5 = createSourceDistributionTaskSource((List<Split>) ImmutableList.of(createWeightedSplit9, createWeightedSplit12, createWeightedSplit10), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 1, 0, 2 * STANDARD_WEIGHT, 3);
        List<TaskDescriptor> readAllTasks5 = readAllTasks(createSourceDistributionTaskSource5);
        Assertions.assertThat(readAllTasks5).hasSize(2);
        Assertions.assertThat(readAllTasks5).allMatch(taskDescriptor5 -> {
            return ((PlanNodeId) Iterables.getOnlyElement(taskDescriptor5.getSplits().keySet())).equals(PLAN_NODE_1);
        });
        Assertions.assertThat(readAllTasks5.get(0).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit9, createWeightedSplit12});
        Assertions.assertThat(readAllTasks5.get(1).getSplits().values()).containsExactlyInAnyOrder(new Split[]{createWeightedSplit10});
        Assert.assertTrue(createSourceDistributionTaskSource5.isFinished());
    }

    @Test
    public void testSourceDistributionTaskSourceLastIncompleteTaskAlwaysCreated() {
        for (int i = 1; i <= 21; i++) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i + 1; i2++) {
                arrayList.add(createWeightedSplit(i2, STANDARD_WEIGHT, new String[0]));
            }
            for (int i3 = 1; i3 < 20; i3++) {
                for (int i4 = 1; i4 <= 5; i4++) {
                    List<TaskDescriptor> readAllTasks = readAllTasks(createSourceDistributionTaskSource(new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, arrayList, i3), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), i4, i, STANDARD_WEIGHT * i, i));
                    Assertions.assertThat(readAllTasks).hasSize(2);
                    org.assertj.guava.api.Assertions.assertThat(((TaskDescriptor) Streams.findLast(readAllTasks.stream()).orElseThrow()).getSplits()).hasSize(1);
                }
            }
        }
    }

    @Test
    public void testSourceDistributionTaskSourceWithAsyncSplitSource() {
        SettableFuture create = SettableFuture.create();
        StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource = createSourceDistributionTaskSource(new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, (ListenableFuture<List<Split>>) create, 0), (ListMultimap<PlanNodeId, ExchangeSourceHandle>) ImmutableListMultimap.of(), 2, 0, 2 * STANDARD_WEIGHT, 1000);
        ListenableFuture moreTasks = createSourceDistributionTaskSource.getMoreTasks();
        Assertions.assertThat(moreTasks).isNotDone();
        create.set(ImmutableList.of(createSplit(1, new String[0]), createSplit(2, new String[0]), createSplit(3, new String[0])));
        List list = (List) MoreFutures.getDone(moreTasks);
        Assertions.assertThat(list).hasSize(1);
        org.assertj.guava.api.Assertions.assertThat(((TaskDescriptor) list.get(0)).getSplits()).hasSize(2);
        ListenableFuture moreTasks2 = createSourceDistributionTaskSource.getMoreTasks();
        Assertions.assertThat(moreTasks2).isDone();
        List list2 = (List) MoreFutures.getDone(moreTasks2);
        Assertions.assertThat(list2).hasSize(1);
        org.assertj.guava.api.Assertions.assertThat(((TaskDescriptor) list2.get(0)).getSplits()).hasSize(1);
        Assertions.assertThat(createSourceDistributionTaskSource.isFinished()).isTrue();
    }

    @Test
    public void testHashDistributionTaskSourceWithAsyncSplitSource() {
        SettableFuture create = SettableFuture.create();
        SettableFuture create2 = SettableFuture.create();
        StageTaskSourceFactory.HashDistributionTaskSource createHashDistributionTaskSource = createHashDistributionTaskSource(ImmutableMap.of(PLAN_NODE_1, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, (ListenableFuture<List<Split>>) create, 0), PLAN_NODE_2, new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, (ListenableFuture<List<Split>>) create2, 0)), ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_3, new TestingExchange.TestingExchangeSourceHandle(0, 1L)), 1, new int[]{0, 1, 2, 3}, Optional.of(getTestingBucketNodeMap(4)), 0L, DataSize.of(0L, DataSize.Unit.BYTE));
        ListenableFuture moreTasks = createHashDistributionTaskSource.getMoreTasks();
        Assertions.assertThat(moreTasks).isNotDone();
        create.set(ImmutableList.of(createBucketedSplit(0, 0), createBucketedSplit(0, 2), createBucketedSplit(0, 3)));
        Assertions.assertThat(moreTasks).isNotDone();
        create2.set(ImmutableList.of(createBucketedSplit(0, 1)));
        List list = (List) MoreFutures.getDone(moreTasks);
        Assertions.assertThat(list).hasSize(4);
        list.forEach(taskDescriptor -> {
            org.assertj.guava.api.Assertions.assertThat(taskDescriptor.getSplits()).hasSize(1);
        });
        Assertions.assertThat(createHashDistributionTaskSource.isFinished()).isTrue();
    }

    private static StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource(List<Split> list, ListMultimap<PlanNodeId, ExchangeSourceHandle> listMultimap, int i, int i2, long j, int i3) {
        return createSourceDistributionTaskSource(new TestingSplitSource(TestingHandles.TEST_CATALOG_HANDLE, list), listMultimap, i, i2, j, i3);
    }

    private static StageTaskSourceFactory.SourceDistributionTaskSource createSourceDistributionTaskSource(SplitSource splitSource, ListMultimap<PlanNodeId, ExchangeSourceHandle> listMultimap, int i, int i2, long j, int i3) {
        return new StageTaskSourceFactory.SourceDistributionTaskSource(new QueryId("query"), PLAN_NODE_1, new TableExecuteContextManager(), splitSource, listMultimap, i, j2 -> {
        }, Optional.of(TestingHandles.TEST_CATALOG_HANDLE), i2, j, i3, DataSize.of(4L, DataSize.Unit.GIGABYTE), MoreExecutors.directExecutor());
    }

    private static Split createSplit(int i, String... strArr) {
        return new Split(TestingHandles.TEST_CATALOG_HANDLE, new TestingConnectorSplit(i, OptionalInt.empty(), addressesList(strArr)));
    }

    private static Split createWeightedSplit(int i, long j, String... strArr) {
        return new Split(TestingHandles.TEST_CATALOG_HANDLE, new TestingConnectorSplit(i, OptionalInt.empty(), addressesList(strArr), j));
    }

    private static Split createBucketedSplit(int i, int i2) {
        return new Split(TestingHandles.TEST_CATALOG_HANDLE, new TestingConnectorSplit(i, OptionalInt.of(i2), Optional.empty()));
    }

    private List<TaskDescriptor> readAllTasks(TaskSource taskSource) {
        ImmutableList.Builder builder = ImmutableList.builder();
        while (!taskSource.isFinished()) {
            builder.addAll((Iterable) MoreFutures.getFutureValue(taskSource.getMoreTasks()));
        }
        return builder.build();
    }

    private Multimap<PlanNodeId, Split> flattenSplits(List<TaskDescriptor> list) {
        return (Multimap) list.stream().flatMap(taskDescriptor -> {
            return taskDescriptor.getSplits().entries().stream();
        }).collect(Multimaps.toMultimap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, HashMultimap::create));
    }

    private static Optional<List<HostAddress>> addressesList(String... strArr) {
        Objects.requireNonNull(strArr, "addresses is null");
        return strArr.length == 0 ? Optional.empty() : Optional.of((List) Arrays.stream(strArr).map(HostAddress::fromString).collect(ImmutableList.toImmutableList()));
    }

    private static BucketNodeMap getTestingBucketNodeMap(int i) {
        return new BucketNodeMap(split -> {
            return ((TestingConnectorSplit) split.getConnectorSplit()).getBucket().orElseThrow();
        }, Collections.nCopies(i, new InternalNode("local", URI.create("local://" + NODE_ADDRESS), NodeVersion.UNKNOWN, true)));
    }
}
