package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerClient;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.throughput.BufferDebloater;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.class */
public class InputGateFairnessTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateFairnessTest$FairnessVerifyingInputGate.class */
    public static class FairnessVerifyingInputGate extends SingleInputGate {
        private static final int BUFFER_SIZE = 32768;
        private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY = NoOpBufferPool::new;
        private final PrioritizedDeque<InputChannel> channelsWithData;
        private final HashSet<InputChannel> uniquenessChecker;

        public FairnessVerifyingInputGate(String str, IntermediateDataSetID intermediateDataSetID, IndexRange indexRange, int i) {
            super(str, 0, intermediateDataSetID, ResultPartitionType.PIPELINED, indexRange, i, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, STUB_BUFFER_POOL_FACTORY, (BufferDecompressor) null, new InputChannelTestUtils.UnpooledMemorySegmentProvider(32768), 32768, new ThroughputCalculator(SystemClock.getInstance()), (BufferDebloater) null, (TieredStorageConsumerClient) null, (TieredStorageNettyServiceImpl) null, (List) null);
            this.channelsWithData = getInputChannelsWithData();
            this.uniquenessChecker = new HashSet<>();
        }

        public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
            synchronized (this.channelsWithData) {
                Assert.assertTrue("too many input channels", this.channelsWithData.size() <= getNumberOfInputChannels());
                ensureUnique(this.channelsWithData.asUnmodifiableCollection());
            }
            return super.getNext();
        }

        private void ensureUnique(Collection<InputChannel> collection) {
            HashSet<InputChannel> hashSet = this.uniquenessChecker;
            for (InputChannel inputChannel : collection) {
                if (!hashSet.add(inputChannel)) {
                    Assert.fail("Duplicate channel in input gate: " + inputChannel);
                }
            }
            Assert.assertTrue("found duplicate input channels", hashSet.size() == collection.size());
            hashSet.clear();
        }
    }

    @Test
    public void testFairConsumptionLocalChannelsPreFilled() throws Exception {
        ResultPartition[] resultPartitionArr = (PipelinedResultPartition[]) IntStream.range(0, 37).mapToObj(i -> {
            return new ResultPartitionBuilder().build();
        }).toArray(i2 -> {
            return new PipelinedResultPartition[i2];
        });
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(42);
        PipelinedSubpartition[] pipelinedSubpartitionArr = (PipelinedSubpartition[]) Arrays.stream(resultPartitionArr).map(pipelinedResultPartition -> {
            return pipelinedResultPartition.getAllPartitions()[0];
        }).toArray(i3 -> {
            return new PipelinedSubpartition[i3];
        });
        for (PipelinedSubpartition pipelinedSubpartition : pipelinedSubpartitionArr) {
            for (int i4 = 0; i4 < 27; i4++) {
                pipelinedSubpartition.add(createFilledFinishedBufferConsumer.copy());
            }
            pipelinedSubpartition.finish();
        }
        for (ResultPartition resultPartition : resultPartitionArr) {
            resultPartition.setup();
        }
        SingleInputGate createFairnessVerifyingInputGate = createFairnessVerifyingInputGate(37);
        setupInputGate(createFairnessVerifyingInputGate, (InputChannel[]) IntStream.range(0, 37).mapToObj(i5 -> {
            return InputChannelBuilder.newBuilder().setChannelIndex(i5).setPartitionManager(resultPartitionArr[i5].partitionManager).setPartitionId(resultPartitionArr[i5].getPartitionId()).buildLocalChannel(createFairnessVerifyingInputGate);
        }).toArray(i6 -> {
            return new InputChannel[i6];
        }));
        for (int i7 = 1036; i7 > 0; i7--) {
            Assert.assertNotNull(createFairnessVerifyingInputGate.getNext());
            int i8 = Integer.MAX_VALUE;
            int i9 = 0;
            for (PipelinedSubpartition pipelinedSubpartition2 : pipelinedSubpartitionArr) {
                int numberOfQueuedBuffers = pipelinedSubpartition2.getNumberOfQueuedBuffers();
                i8 = Math.min(i8, numberOfQueuedBuffers);
                i9 = Math.max(i9, numberOfQueuedBuffers);
            }
            Assert.assertTrue(i9 == i8 || i9 == i8 + 1);
        }
        Assert.assertFalse(createFairnessVerifyingInputGate.getNext().isPresent());
    }

    @Test
    public void testFairConsumptionLocalChannels() throws Exception {
        ResultPartition[] resultPartitionArr = (PipelinedResultPartition[]) IntStream.range(0, 37).mapToObj(i -> {
            return new ResultPartitionBuilder().build();
        }).toArray(i2 -> {
            return new PipelinedResultPartition[i2];
        });
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(42);
        Throwable th = null;
        try {
            try {
                PipelinedSubpartition[] pipelinedSubpartitionArr = (PipelinedSubpartition[]) Arrays.stream(resultPartitionArr).map(pipelinedResultPartition -> {
                    return pipelinedResultPartition.getAllPartitions()[0];
                }).toArray(i3 -> {
                    return new PipelinedSubpartition[i3];
                });
                SingleInputGate createFairnessVerifyingInputGate = createFairnessVerifyingInputGate(37);
                InputChannel[] inputChannelArr = (InputChannel[]) IntStream.range(0, 37).mapToObj(i4 -> {
                    return InputChannelBuilder.newBuilder().setChannelIndex(i4).setPartitionManager(resultPartitionArr[i4].partitionManager).setPartitionId(resultPartitionArr[i4].getPartitionId()).buildLocalChannel(createFairnessVerifyingInputGate);
                }).toArray(i5 -> {
                    return new InputChannel[i5];
                });
                for (ResultPartition resultPartition : resultPartitionArr) {
                    resultPartition.setup();
                }
                pipelinedSubpartitionArr[12].add(createFilledFinishedBufferConsumer.copy());
                setupInputGate(createFairnessVerifyingInputGate, inputChannelArr);
                for (int i6 = 0; i6 < 999; i6++) {
                    Assert.assertNotNull(createFairnessVerifyingInputGate.getNext());
                    int i7 = Integer.MAX_VALUE;
                    int i8 = 0;
                    for (PipelinedSubpartition pipelinedSubpartition : pipelinedSubpartitionArr) {
                        int numberOfQueuedBuffers = pipelinedSubpartition.getNumberOfQueuedBuffers();
                        i7 = Math.min(i7, numberOfQueuedBuffers);
                        i8 = Math.max(i8, numberOfQueuedBuffers);
                    }
                    Assert.assertTrue(i8 == i7 || i8 == i7 + 1);
                    if (i6 % 74 == 0) {
                        fillRandom(pipelinedSubpartitionArr, 3, createFilledFinishedBufferConsumer);
                    }
                }
                if (createFilledFinishedBufferConsumer != null) {
                    if (0 == 0) {
                        createFilledFinishedBufferConsumer.close();
                        return;
                    }
                    try {
                        createFilledFinishedBufferConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createFilledFinishedBufferConsumer != null) {
                if (th != null) {
                    try {
                        createFilledFinishedBufferConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createFilledFinishedBufferConsumer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFairConsumptionRemoteChannelsPreFilled() throws Exception {
        Buffer createBuffer = TestBufferFactory.createBuffer(42);
        SingleInputGate createFairnessVerifyingInputGate = createFairnessVerifyingInputGate(37);
        ConnectionManager createDummyConnectionManager = InputChannelTestUtils.createDummyConnectionManager();
        RemoteInputChannel[] remoteInputChannelArr = new RemoteInputChannel[37];
        for (int i = 0; i < 37; i++) {
            RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(createFairnessVerifyingInputGate, i, createDummyConnectionManager);
            remoteInputChannelArr[i] = createRemoteInputChannel;
            for (int i2 = 0; i2 < 27; i2++) {
                createRemoteInputChannel.onBuffer(createBuffer, i2, -1);
            }
            createRemoteInputChannel.onBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE, false), 27, -1);
        }
        createFairnessVerifyingInputGate.setInputChannels(remoteInputChannelArr);
        createFairnessVerifyingInputGate.setup();
        createFairnessVerifyingInputGate.requestPartitions();
        for (int i3 = 1036; i3 > 0; i3--) {
            Assert.assertNotNull(createFairnessVerifyingInputGate.getNext());
            int i4 = Integer.MAX_VALUE;
            int i5 = 0;
            for (RemoteInputChannel remoteInputChannel : remoteInputChannelArr) {
                int numberOfQueuedBuffers = remoteInputChannel.getNumberOfQueuedBuffers();
                i4 = Math.min(i4, numberOfQueuedBuffers);
                i5 = Math.max(i5, numberOfQueuedBuffers);
            }
            Assert.assertTrue(i5 == i4 || i5 == i4 + 1);
        }
        Assert.assertFalse(createFairnessVerifyingInputGate.getNext().isPresent());
    }

    @Test
    public void testFairConsumptionRemoteChannels() throws Exception {
        Buffer createBuffer = TestBufferFactory.createBuffer(42);
        SingleInputGate createFairnessVerifyingInputGate = createFairnessVerifyingInputGate(37);
        ConnectionManager createDummyConnectionManager = InputChannelTestUtils.createDummyConnectionManager();
        RemoteInputChannel[] remoteInputChannelArr = new RemoteInputChannel[37];
        int[] iArr = new int[37];
        for (int i = 0; i < 37; i++) {
            remoteInputChannelArr[i] = createRemoteInputChannel(createFairnessVerifyingInputGate, i, createDummyConnectionManager);
        }
        remoteInputChannelArr[11].onBuffer(createBuffer, 0, -1);
        iArr[11] = iArr[11] + 1;
        setupInputGate(createFairnessVerifyingInputGate, remoteInputChannelArr);
        for (int i2 = 0; i2 < 999; i2++) {
            Assert.assertNotNull(createFairnessVerifyingInputGate.getNext());
            int i3 = Integer.MAX_VALUE;
            int i4 = 0;
            for (RemoteInputChannel remoteInputChannel : remoteInputChannelArr) {
                int numberOfQueuedBuffers = remoteInputChannel.getNumberOfQueuedBuffers();
                i3 = Math.min(i3, numberOfQueuedBuffers);
                i4 = Math.max(i4, numberOfQueuedBuffers);
            }
            Assert.assertTrue(i4 == i3 || i4 == i3 + 1);
            if (i2 % 74 == 0) {
                fillRandom(remoteInputChannelArr, iArr, 3, createBuffer);
            }
        }
    }

    private SingleInputGate createFairnessVerifyingInputGate(int i) {
        return new FairnessVerifyingInputGate("Test Task Name", new IntermediateDataSetID(), new IndexRange(0, 0), i);
    }

    private void fillRandom(PipelinedSubpartition[] pipelinedSubpartitionArr, int i, BufferConsumer bufferConsumer) throws Exception {
        ArrayList arrayList = new ArrayList(pipelinedSubpartitionArr.length * i);
        for (int i2 = 0; i2 < pipelinedSubpartitionArr.length; i2++) {
            for (int i3 = 0; i3 < i; i3++) {
                arrayList.add(Integer.valueOf(i2));
            }
        }
        Collections.shuffle(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            pipelinedSubpartitionArr[((Integer) it.next()).intValue()].add(bufferConsumer.copy());
        }
    }

    private void fillRandom(RemoteInputChannel[] remoteInputChannelArr, int[] iArr, int i, Buffer buffer) throws Exception {
        ArrayList arrayList = new ArrayList(remoteInputChannelArr.length * i);
        for (int i2 = 0; i2 < remoteInputChannelArr.length; i2++) {
            for (int i3 = 0; i3 < i; i3++) {
                arrayList.add(Integer.valueOf(i2));
            }
        }
        Collections.shuffle(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            RemoteInputChannel remoteInputChannel = remoteInputChannelArr[intValue];
            int i4 = iArr[intValue];
            iArr[intValue] = i4 + 1;
            remoteInputChannel.onBuffer(buffer, i4, -1);
        }
    }

    public static RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate, int i, ConnectionManager connectionManager) {
        return InputChannelBuilder.newBuilder().setChannelIndex(i).setConnectionManager(connectionManager).buildRemoteChannel(singleInputGate);
    }

    public static void setupInputGate(SingleInputGate singleInputGate, InputChannel... inputChannelArr) throws IOException {
        singleInputGate.setInputChannels(inputChannelArr);
        singleInputGate.setup();
        singleInputGate.requestPartitions();
    }
}
