package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.Threads;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.SplitAssigner;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.spi.QueryId;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/execution/scheduler/TestEventDrivenTaskSource.class */
public class TestEventDrivenTaskSource {
    private static final int INVOCATION_COUNT = 20;
    private static final long TIMEOUT = 60000;
    private static final PlanNodeId PLAN_NODE_1 = new PlanNodeId("plan-node-1");
    private static final PlanNodeId PLAN_NODE_2 = new PlanNodeId("plan-node-2");
    private static final PlanNodeId PLAN_NODE_3 = new PlanNodeId("plan-node-3");
    private static final PlanNodeId PLAN_NODE_4 = new PlanNodeId("plan-node-3");
    private static final PlanFragmentId FRAGMENT_1 = new PlanFragmentId("fragment-1");
    private static final PlanFragmentId FRAGMENT_2 = new PlanFragmentId("fragment-2");
    private static final PlanFragmentId FRAGMENT_3 = new PlanFragmentId("fragment-3");
    private final AtomicInteger nextId = new AtomicInteger();
    private ListeningScheduledExecutorService executor;

    /* loaded from: input_file:io/trino/execution/scheduler/TestEventDrivenTaskSource$TestingExchange.class */
    private static class TestingExchange implements Exchange {

        @GuardedBy("this")
        private ExchangeSourceHandleSource exchangeSourceHandleSource;

        @GuardedBy("this")
        private boolean closed;

        public TestingExchange(ExchangeSourceHandleSource exchangeSourceHandleSource) {
            this.exchangeSourceHandleSource = (ExchangeSourceHandleSource) Objects.requireNonNull(exchangeSourceHandleSource, "exchangeSourceHandleSource is null");
        }

        public ExchangeId getId() {
            throw new UnsupportedOperationException();
        }

        public ExchangeSinkHandle addSink(int i) {
            throw new UnsupportedOperationException();
        }

        public void noMoreSinks() {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<ExchangeSinkInstanceHandle> instantiateSink(ExchangeSinkHandle exchangeSinkHandle, int i) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<ExchangeSinkInstanceHandle> updateSinkInstanceHandle(ExchangeSinkHandle exchangeSinkHandle, int i) {
            throw new UnsupportedOperationException();
        }

        public void sinkFinished(ExchangeSinkHandle exchangeSinkHandle, int i) {
            throw new UnsupportedOperationException();
        }

        public void allRequiredSinksFinished() {
            throw new UnsupportedOperationException();
        }

        public synchronized ExchangeSourceHandleSource getSourceHandles() {
            Preconditions.checkState(!this.closed, "already closed");
            Preconditions.checkState(this.exchangeSourceHandleSource != null, "already retrieved");
            ExchangeSourceHandleSource exchangeSourceHandleSource = this.exchangeSourceHandleSource;
            this.exchangeSourceHandleSource = null;
            return exchangeSourceHandleSource;
        }

        public synchronized void close() {
            this.closed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/TestEventDrivenTaskSource$TestingExchangeSourceHandleSource.class */
    public static class TestingExchangeSourceHandleSource implements ExchangeSourceHandleSource {
        private final ScheduledExecutorService executor;

        @GuardedBy("this")
        private final Queue<ExchangeSourceHandle> remainingHandles;

        @GuardedBy("this")
        private CompletableFuture<ExchangeSourceHandleSource.ExchangeSourceHandleBatch> currentFuture;

        @GuardedBy("this")
        private boolean closed;

        private TestingExchangeSourceHandleSource(ScheduledExecutorService scheduledExecutorService, List<ExchangeSourceHandle> list) {
            this.executor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "executor is null");
            this.remainingHandles = new LinkedList((Collection) Objects.requireNonNull(list, "handles is null"));
        }

        public synchronized CompletableFuture<ExchangeSourceHandleSource.ExchangeSourceHandleBatch> getNextBatch() {
            Preconditions.checkState(!this.closed, "closed");
            Preconditions.checkState(this.currentFuture == null || this.currentFuture.isDone(), "currentFuture is still running");
            this.currentFuture = new CompletableFuture<>();
            long nextInt = ThreadLocalRandom.current().nextInt(3);
            if (nextInt == 0) {
                setNextBatch();
            } else {
                this.executor.schedule(this::setNextBatch, nextInt, TimeUnit.MILLISECONDS);
            }
            return this.currentFuture;
        }

        private void setNextBatch() {
            CompletableFuture<ExchangeSourceHandleSource.ExchangeSourceHandleBatch> completableFuture;
            ExchangeSourceHandleSource.ExchangeSourceHandleBatch exchangeSourceHandleBatch;
            synchronized (this) {
                completableFuture = this.currentFuture;
                ExchangeSourceHandle poll = this.remainingHandles.poll();
                exchangeSourceHandleBatch = new ExchangeSourceHandleSource.ExchangeSourceHandleBatch(poll == null ? ImmutableList.of() : ImmutableList.of(poll), this.remainingHandles.isEmpty());
            }
            if (completableFuture != null) {
                completableFuture.complete(exchangeSourceHandleBatch);
            }
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.currentFuture != null) {
                this.currentFuture.cancel(true);
                this.currentFuture = null;
            }
            this.remainingHandles.clear();
        }

        public synchronized boolean isClosed() {
            return this.closed;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/TestEventDrivenTaskSource$TestingSplitAssigner.class */
    public static class TestingSplitAssigner implements SplitAssigner {
        private final Set<PlanNodeId> allSources;
        private final Set<Integer> partitions = new HashSet();
        private final Set<PlanNodeId> finishedSources = new HashSet();
        private boolean finished;

        private TestingSplitAssigner(Set<PlanNodeId> set) {
            this.allSources = ImmutableSet.copyOf((Collection) Objects.requireNonNull(set, "allSources is null"));
        }

        public SplitAssigner.AssignmentResult assign(PlanNodeId planNodeId, ListMultimap<Integer, Split> listMultimap, boolean z) {
            Preconditions.checkState(!this.finished, "finished is set");
            SplitAssigner.AssignmentResult.Builder builder = SplitAssigner.AssignmentResult.builder();
            Multimaps.asMap(listMultimap).forEach((num, list) -> {
                if (this.partitions.add(num)) {
                    builder.addPartition(new SplitAssigner.Partition(num.intValue(), new NodeRequirements(Optional.empty(), ImmutableSet.of())));
                    Iterator<PlanNodeId> it = this.finishedSources.iterator();
                    while (it.hasNext()) {
                        builder.updatePartition(new SplitAssigner.PartitionUpdate(num.intValue(), it.next(), ImmutableList.of(), true));
                    }
                }
                builder.updatePartition(new SplitAssigner.PartitionUpdate(num.intValue(), planNodeId, list, z));
            });
            if (z) {
                this.finishedSources.add(planNodeId);
                Iterator<Integer> it = this.partitions.iterator();
                while (it.hasNext()) {
                    builder.updatePartition(new SplitAssigner.PartitionUpdate(it.next().intValue(), planNodeId, ImmutableList.of(), true));
                }
            }
            if (this.finishedSources.containsAll(this.allSources)) {
                Set<Integer> set = this.partitions;
                Objects.requireNonNull(builder);
                set.forEach((v1) -> {
                    r1.sealPartition(v1);
                });
            }
            return builder.build();
        }

        public SplitAssigner.AssignmentResult finish() {
            SplitAssigner.AssignmentResult.Builder builder = SplitAssigner.AssignmentResult.builder();
            if (this.finished) {
                return builder.build();
            }
            this.finished = true;
            Preconditions.checkState(this.finishedSources.containsAll(this.allSources));
            if (this.partitions.isEmpty()) {
                this.partitions.add(0);
                builder.addPartition(new SplitAssigner.Partition(0, new NodeRequirements(Optional.empty(), ImmutableSet.of()))).sealPartition(0);
            }
            return builder.setNoMorePartitions().build();
        }

        public boolean isFinished() {
            return this.finished;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/TestEventDrivenTaskSource$TestingSplitSource.class */
    public static class TestingSplitSource implements SplitSource {
        private final ScheduledExecutorService executor;

        @GuardedBy("this")
        private final Queue<ConnectorSplit> remainingSplits;

        @GuardedBy("this")
        private SettableFuture<SplitSource.SplitBatch> currentFuture;

        @GuardedBy("this")
        private boolean finished;

        @GuardedBy("this")
        private boolean closed;

        public TestingSplitSource(ScheduledExecutorService scheduledExecutorService, List<ConnectorSplit> list) {
            this.executor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "executor is null");
            this.remainingSplits = new LinkedList(list);
        }

        public CatalogHandle getCatalogHandle() {
            return TestingHandles.TEST_CATALOG_HANDLE;
        }

        public synchronized ListenableFuture<SplitSource.SplitBatch> getNextBatch(int i) {
            Preconditions.checkState(!this.closed, "closed");
            Preconditions.checkState(this.currentFuture == null || this.currentFuture.isDone(), "currentFuture is still running");
            this.currentFuture = SettableFuture.create();
            long nextInt = ThreadLocalRandom.current().nextInt(3);
            if (nextInt == 0) {
                setNextBatch();
            } else {
                this.executor.schedule(this::setNextBatch, nextInt, TimeUnit.MILLISECONDS);
            }
            return this.currentFuture;
        }

        private void setNextBatch() {
            SettableFuture<SplitSource.SplitBatch> settableFuture;
            SplitSource.SplitBatch splitBatch;
            synchronized (this) {
                settableFuture = this.currentFuture;
                ConnectorSplit poll = this.remainingSplits.poll();
                boolean isEmpty = this.remainingSplits.isEmpty();
                splitBatch = new SplitSource.SplitBatch(poll == null ? ImmutableList.of() : ImmutableList.of(new Split(TestingHandles.TEST_CATALOG_HANDLE, poll)), isEmpty);
                if (isEmpty) {
                    this.finished = true;
                }
            }
            if (settableFuture != null) {
                settableFuture.set(splitBatch);
            }
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.currentFuture != null) {
                this.currentFuture.cancel(true);
                this.currentFuture = null;
            }
            this.remainingSplits.clear();
        }

        public synchronized boolean isFinished() {
            return this.finished || this.closed;
        }

        public Optional<List<Object>> getTableExecuteSplitsInfo() {
            return Optional.empty();
        }

        public synchronized boolean isClosed() {
            return this.closed;
        }
    }

    @BeforeClass
    public void setUp() {
        this.executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(10, Threads.daemonThreadsNamed("dispatcher-query-%s")));
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @Test(invocationCount = INVOCATION_COUNT, timeOut = TIMEOUT)
    public void testHappyPath() throws Exception {
        testStageTaskSourceSuccess(ImmutableListMultimap.of(), ImmutableMap.of(), ImmutableListMultimap.of());
        testStageTaskSourceSuccess(ImmutableListMultimap.of(), ImmutableMap.of(), ImmutableListMultimap.of(PLAN_NODE_1, createSplit(0)));
        testStageTaskSourceSuccess(ImmutableListMultimap.of(), ImmutableMap.of(), ImmutableListMultimap.builder().putAll(PLAN_NODE_1, new ConnectorSplit[]{createSplit(0), createSplit(0), createSplit(1)}).build());
        testStageTaskSourceSuccess(ImmutableListMultimap.of(), ImmutableMap.of(), ImmutableListMultimap.builder().putAll(PLAN_NODE_1, new ConnectorSplit[]{createSplit(0)}).putAll(PLAN_NODE_2, new ConnectorSplit[]{createSplit(0)}).build());
        testStageTaskSourceSuccess(ImmutableListMultimap.of(), ImmutableMap.of(), ImmutableListMultimap.builder().putAll(PLAN_NODE_1, new ConnectorSplit[]{createSplit(0)}).putAll(PLAN_NODE_2, new ConnectorSplit[]{createSplit(0), createSplit(1)}).build());
        testStageTaskSourceSuccess(ImmutableListMultimap.of(), ImmutableMap.of(), ImmutableListMultimap.builder().putAll(PLAN_NODE_1, new ConnectorSplit[]{createSplit(0), createSplit(3), createSplit(4)}).putAll(PLAN_NODE_2, new ConnectorSplit[]{createSplit(0), createSplit(1)}).build());
        testStageTaskSourceSuccess(ImmutableListMultimap.of(FRAGMENT_1, createSourceHandle(1)), ImmutableMap.of(FRAGMENT_1, PLAN_NODE_1), ImmutableListMultimap.of());
        testStageTaskSourceSuccess(ImmutableListMultimap.builder().putAll(FRAGMENT_1, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(1)}).build(), ImmutableMap.of(FRAGMENT_1, PLAN_NODE_1), ImmutableListMultimap.of());
        testStageTaskSourceSuccess(ImmutableListMultimap.builder().putAll(FRAGMENT_1, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(1)}).build(), ImmutableMap.builder().put(FRAGMENT_1, PLAN_NODE_1).put(FRAGMENT_2, PLAN_NODE_2).buildOrThrow(), ImmutableListMultimap.of());
        testStageTaskSourceSuccess(ImmutableListMultimap.builder().putAll(FRAGMENT_1, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(1)}).putAll(FRAGMENT_2, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(3)}).build(), ImmutableMap.builder().put(FRAGMENT_1, PLAN_NODE_1).put(FRAGMENT_2, PLAN_NODE_2).buildOrThrow(), ImmutableListMultimap.of());
        testStageTaskSourceSuccess(ImmutableListMultimap.builder().putAll(FRAGMENT_1, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(1)}).putAll(FRAGMENT_2, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(3)}).putAll(FRAGMENT_3, new ExchangeSourceHandle[]{createSourceHandle(4)}).build(), ImmutableMap.builder().put(FRAGMENT_1, PLAN_NODE_1).put(FRAGMENT_2, PLAN_NODE_1).put(FRAGMENT_3, PLAN_NODE_2).buildOrThrow(), ImmutableListMultimap.of());
        testStageTaskSourceSuccess(ImmutableListMultimap.builder().putAll(FRAGMENT_1, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(1)}).build(), ImmutableMap.of(FRAGMENT_1, PLAN_NODE_1), ImmutableListMultimap.builder().putAll(PLAN_NODE_3, new ConnectorSplit[]{createSplit(0)}).putAll(PLAN_NODE_4, new ConnectorSplit[]{createSplit(0)}).build());
        testStageTaskSourceSuccess(ImmutableListMultimap.builder().putAll(FRAGMENT_1, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(1)}).putAll(FRAGMENT_2, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(3)}).build(), ImmutableMap.builder().put(FRAGMENT_1, PLAN_NODE_3).put(FRAGMENT_2, PLAN_NODE_4).buildOrThrow(), ImmutableListMultimap.builder().putAll(PLAN_NODE_1, new ConnectorSplit[]{createSplit(0), createSplit(3), createSplit(4)}).putAll(PLAN_NODE_2, new ConnectorSplit[]{createSplit(0), createSplit(1)}).build());
        testStageTaskSourceSuccess(ImmutableListMultimap.builder().putAll(FRAGMENT_1, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(1)}).putAll(FRAGMENT_2, new ExchangeSourceHandle[]{createSourceHandle(1), createSourceHandle(3)}).putAll(FRAGMENT_3, new ExchangeSourceHandle[]{createSourceHandle(4)}).build(), ImmutableMap.builder().put(FRAGMENT_1, PLAN_NODE_1).put(FRAGMENT_2, PLAN_NODE_1).put(FRAGMENT_3, PLAN_NODE_2).buildOrThrow(), ImmutableListMultimap.builder().putAll(PLAN_NODE_3, new ConnectorSplit[]{createSplit(0), createSplit(3), createSplit(4)}).putAll(PLAN_NODE_4, new ConnectorSplit[]{createSplit(0), createSplit(1)}).build());
    }

    @Test(invocationCount = INVOCATION_COUNT, timeOut = TIMEOUT)
    public void stressTest() throws Exception {
        ImmutableSet<PlanFragmentId> of = ImmutableSet.of(FRAGMENT_1, FRAGMENT_2, FRAGMENT_3);
        ImmutableMap of2 = ImmutableMap.of(FRAGMENT_1, PLAN_NODE_1, FRAGMENT_2, PLAN_NODE_1, FRAGMENT_3, PLAN_NODE_2);
        ImmutableSet<PlanNodeId> of3 = ImmutableSet.of(PLAN_NODE_3, PLAN_NODE_4);
        ArrayListMultimap create = ArrayListMultimap.create();
        for (PlanFragmentId planFragmentId : of) {
            int nextInt = ThreadLocalRandom.current().nextInt(100);
            for (int i = 0; i < nextInt; i++) {
                create.put(planFragmentId, createSourceHandle(ThreadLocalRandom.current().nextInt(10)));
            }
        }
        ArrayListMultimap create2 = ArrayListMultimap.create();
        for (PlanNodeId planNodeId : of3) {
            int nextInt2 = ThreadLocalRandom.current().nextInt(100);
            for (int i2 = 0; i2 < nextInt2; i2++) {
                create2.put(planNodeId, createSplit(ThreadLocalRandom.current().nextInt(10)));
            }
        }
        testStageTaskSourceSuccess(create, of2, create2);
    }

    private void testStageTaskSourceSuccess(ListMultimap<PlanFragmentId, ExchangeSourceHandle> listMultimap, Map<PlanFragmentId, PlanNodeId> map, ListMultimap<PlanNodeId, ConnectorSplit> listMultimap2) throws Exception {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        Multimaps.asMap(listMultimap).forEach((planFragmentId, list) -> {
            TestingExchangeSourceHandleSource testingExchangeSourceHandleSource = new TestingExchangeSourceHandleSource(this.executor, list);
            arrayList.add(testingExchangeSourceHandleSource);
            hashMap.put(planFragmentId, new TestingExchange(testingExchangeSourceHandleSource));
        });
        map.keySet().forEach(planFragmentId2 -> {
            if (hashMap.containsKey(planFragmentId2)) {
                return;
            }
            TestingExchangeSourceHandleSource testingExchangeSourceHandleSource = new TestingExchangeSourceHandleSource(this.executor, ImmutableList.of());
            arrayList.add(testingExchangeSourceHandleSource);
            hashMap.put(planFragmentId2, new TestingExchange(testingExchangeSourceHandleSource));
        });
        HashMap hashMap2 = new HashMap();
        Multimaps.asMap(listMultimap2).forEach((planNodeId, list2) -> {
            hashMap2.put(planNodeId, new TestingSplitSource(this.executor, list2));
        });
        SplitAssignerTester splitAssignerTester = new SplitAssignerTester();
        FaultTolerantPartitioningScheme createPartitioningScheme = createPartitioningScheme(getPartitionCount(listMultimap.values(), listMultimap2.values()));
        AtomicLong atomicLong = new AtomicLong();
        EventDrivenTaskSource eventDrivenTaskSource = new EventDrivenTaskSource(new QueryId("query"), new TableExecuteContextManager(), hashMap, (SetMultimap) map.entrySet().stream().collect(ImmutableSetMultimap.toImmutableSetMultimap((v0) -> {
            return v0.getValue();
        }, (v0) -> {
            return v0.getKey();
        })), () -> {
            return hashMap2;
        }, new TestingSplitAssigner(ImmutableSet.builder().addAll(map.values()).addAll(listMultimap2.keySet()).build()), this.executor, 1, 1L, createPartitioningScheme, j -> {
            atomicLong.incrementAndGet();
        });
        while (splitAssignerTester.getTaskDescriptors().isEmpty()) {
            try {
                splitAssignerTester.update((SplitAssigner.AssignmentResult) eventDrivenTaskSource.process().get(10L, TimeUnit.SECONDS));
            } catch (Throwable th) {
                try {
                    eventDrivenTaskSource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        List<TaskDescriptor> list3 = splitAssignerTester.getTaskDescriptors().get();
        eventDrivenTaskSource.close();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((TestingExchangeSourceHandleSource) it.next()).isClosed());
        }
        for (SplitSource splitSource : hashMap2.values()) {
            if (splitSource instanceof TestingSplitSource) {
                Assert.assertTrue(((TestingSplitSource) splitSource).isClosed());
            } else {
                Assert.fail("unexpected split source: " + splitSource.getClass());
            }
        }
        Assertions.assertThat(list3).isNotNull().isNotEmpty();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        for (Map.Entry entry : listMultimap.entries()) {
            TestingExchangeSourceHandle testingExchangeSourceHandle = (TestingExchangeSourceHandle) entry.getValue();
            ((SetMultimap) hashMap3.computeIfAbsent(Integer.valueOf(testingExchangeSourceHandle.getPartitionId()), num -> {
                return HashMultimap.create();
            })).put(map.get(entry.getKey()), testingExchangeSourceHandle);
        }
        for (Map.Entry entry2 : listMultimap2.entries()) {
            TestingConnectorSplit testingConnectorSplit = (TestingConnectorSplit) entry2.getValue();
            ((SetMultimap) hashMap4.computeIfAbsent(Integer.valueOf(testingConnectorSplit.getBucket().orElseThrow()), num2 -> {
                return HashMultimap.create();
            })).put((PlanNodeId) entry2.getKey(), testingConnectorSplit);
        }
        HashMap hashMap5 = new HashMap();
        HashMap hashMap6 = new HashMap();
        for (TaskDescriptor taskDescriptor : list3) {
            int partitionId = taskDescriptor.getPartitionId();
            for (Map.Entry entry3 : taskDescriptor.getSplits().entries()) {
                if (((Split) entry3.getValue()).getCatalogHandle().equals(ExchangeOperator.REMOTE_CATALOG_HANDLE)) {
                    for (ExchangeSourceHandle exchangeSourceHandle : ((Split) entry3.getValue()).getConnectorSplit().getExchangeInput().getExchangeSourceHandles()) {
                        Assert.assertEquals(exchangeSourceHandle.getPartitionId(), partitionId);
                        ((SetMultimap) hashMap5.computeIfAbsent(Integer.valueOf(partitionId), num3 -> {
                            return HashMultimap.create();
                        })).put((PlanNodeId) entry3.getKey(), (TestingExchangeSourceHandle) exchangeSourceHandle);
                    }
                } else {
                    TestingConnectorSplit testingConnectorSplit2 = (TestingConnectorSplit) ((Split) entry3.getValue()).getConnectorSplit();
                    Assert.assertEquals(testingConnectorSplit2.getBucket().orElseThrow(), partitionId);
                    ((SetMultimap) hashMap6.computeIfAbsent(Integer.valueOf(partitionId), num4 -> {
                        return HashMultimap.create();
                    })).put((PlanNodeId) entry3.getKey(), testingConnectorSplit2);
                }
            }
        }
        Assert.assertEquals(hashMap5, hashMap3);
        Assert.assertEquals(hashMap6, hashMap4);
    }

    private static FaultTolerantPartitioningScheme createPartitioningScheme(int i) {
        return new FaultTolerantPartitioningScheme(i, Optional.of(IntStream.range(0, i).toArray()), Optional.of(split -> {
            return ((TestingConnectorSplit) split.getConnectorSplit()).getBucket().orElseThrow();
        }), Optional.empty());
    }

    private static int getPartitionCount(Collection<ExchangeSourceHandle> collection, Collection<ConnectorSplit> collection2) {
        int orElse = collection.stream().mapToInt((v0) -> {
            return v0.getPartitionId();
        }).max().orElse(-1);
        Stream<ConnectorSplit> stream = collection2.stream();
        Class<TestingConnectorSplit> cls = TestingConnectorSplit.class;
        Objects.requireNonNull(TestingConnectorSplit.class);
        return Math.max(Math.max(orElse, stream.map((v1) -> {
            return r2.cast(v1);
        }).map((v0) -> {
            return v0.getBucket();
        }).mapToInt((v0) -> {
            return v0.orElseThrow();
        }).max().orElse(-1)) + 1, 1);
    }

    private TestingExchangeSourceHandle createSourceHandle(int i) {
        return new TestingExchangeSourceHandle(this.nextId.getAndIncrement(), i, 0L);
    }

    private TestingConnectorSplit createSplit(int i) {
        return new TestingConnectorSplit(this.nextId.getAndIncrement(), OptionalInt.of(i), Optional.empty());
    }
}
