package io.trino.operator;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.SessionTestUtils;
import io.trino.execution.Lifespan;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.metadata.Split;
import io.trino.operator.ExchangeOperator;
import io.trino.spi.Page;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import io.trino.split.RemoteSplit;
import io.trino.sql.analyzer.FeaturesConfig;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingTaskContext;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/operator/TestExchangeOperator.class */
public class TestExchangeOperator {
    private static final List<Type> TYPES = ImmutableList.of(VarcharType.VARCHAR);
    private static final PagesSerdeFactory SERDE_FACTORY = new TestingPagesSerdeFactory();
    private static final String TASK_1_ID = "task1";
    private static final String TASK_2_ID = "task2";
    private static final String TASK_3_ID = "task3";
    private final LoadingCache<String, TestingTaskBuffer> taskBuffers = CacheBuilder.newBuilder().build(CacheLoader.from(TestingTaskBuffer::new));
    private ScheduledExecutorService scheduler;
    private ScheduledExecutorService scheduledExecutor;
    private HttpClient httpClient;
    private ExchangeClientSupplier exchangeClientSupplier;
    private ExecutorService pageBufferClientCallbackExecutor;

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed(getClass().getSimpleName() + "-%s"));
        this.scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed(getClass().getSimpleName() + "-scheduledExecutor-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
        this.httpClient = new TestingHttpClient(new TestingExchangeHttpClientHandler(this.taskBuffers), this.scheduler);
        this.exchangeClientSupplier = localMemoryContext -> {
            return new ExchangeClient("localhost", FeaturesConfig.DataIntegrityVerification.ABORT, DataSize.of(32L, DataSize.Unit.MEGABYTE), DataSize.of(10L, DataSize.Unit.MEGABYTE), 3, new Duration(1.0d, TimeUnit.MINUTES), true, this.httpClient, this.scheduler, localMemoryContext, this.pageBufferClientCallbackExecutor);
        };
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        this.httpClient.close();
        this.httpClient = null;
        this.scheduler.shutdownNow();
        this.scheduler = null;
        this.scheduledExecutor.shutdownNow();
        this.scheduledExecutor = null;
        this.pageBufferClientCallbackExecutor.shutdownNow();
        this.pageBufferClientCallbackExecutor = null;
    }

    @BeforeMethod
    public void setUpMethod() {
        this.taskBuffers.invalidateAll();
    }

    @Test
    public void testSimple() throws Exception {
        SourceOperator createExchangeOperator = createExchangeOperator();
        createExchangeOperator.addSplit(newRemoteSplit(TASK_1_ID));
        createExchangeOperator.addSplit(newRemoteSplit(TASK_2_ID));
        createExchangeOperator.addSplit(newRemoteSplit(TASK_3_ID));
        createExchangeOperator.noMoreSplits();
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(10, true);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(10, true);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(10, true);
        waitForPages(createExchangeOperator, 30);
        waitForFinished(createExchangeOperator);
    }

    private static Split newRemoteSplit(String str) {
        return new Split(ExchangeOperator.REMOTE_CONNECTOR_ID, new RemoteSplit(URI.create("http://localhost/" + str)), Lifespan.taskWide());
    }

    @Test
    public void testWaitForClose() throws Exception {
        SourceOperator createExchangeOperator = createExchangeOperator();
        createExchangeOperator.addSplit(newRemoteSplit(TASK_1_ID));
        createExchangeOperator.addSplit(newRemoteSplit(TASK_2_ID));
        createExchangeOperator.addSplit(newRemoteSplit(TASK_3_ID));
        createExchangeOperator.noMoreSplits();
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(1, false);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(1, false);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(1, false);
        waitForPages(createExchangeOperator, 3);
        Assert.assertEquals(createExchangeOperator.isFinished(), false);
        Assert.assertEquals(createExchangeOperator.needsInput(), false);
        Assert.assertEquals(createExchangeOperator.getOutput(), (Object) null);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(2, true);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(2, true);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(2, true);
        waitForPages(createExchangeOperator, 6);
        waitForFinished(createExchangeOperator);
    }

    @Test
    public void testWaitForNoMoreSplits() throws Exception {
        SourceOperator createExchangeOperator = createExchangeOperator();
        createExchangeOperator.addSplit(newRemoteSplit(TASK_1_ID));
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(1, true);
        waitForPages(createExchangeOperator, 1);
        Assert.assertEquals(createExchangeOperator.isFinished(), false);
        Assert.assertEquals(createExchangeOperator.needsInput(), false);
        Assert.assertEquals(createExchangeOperator.getOutput(), (Object) null);
        createExchangeOperator.addSplit(newRemoteSplit(TASK_2_ID));
        createExchangeOperator.noMoreSplits();
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(2, true);
        waitForPages(createExchangeOperator, 2);
        waitForFinished(createExchangeOperator);
    }

    @Test
    public void testFinish() throws Exception {
        SourceOperator createExchangeOperator = createExchangeOperator();
        createExchangeOperator.addSplit(newRemoteSplit(TASK_1_ID));
        createExchangeOperator.addSplit(newRemoteSplit(TASK_2_ID));
        createExchangeOperator.addSplit(newRemoteSplit(TASK_3_ID));
        createExchangeOperator.noMoreSplits();
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(1, false);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(1, false);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(1, false);
        waitForPages(createExchangeOperator, 3);
        Assert.assertEquals(createExchangeOperator.isFinished(), false);
        Assert.assertEquals(createExchangeOperator.needsInput(), false);
        Assert.assertEquals(createExchangeOperator.getOutput(), (Object) null);
        createExchangeOperator.finish();
        waitForFinished(createExchangeOperator);
    }

    private SourceOperator createExchangeOperator() {
        SourceOperator createOperator = new ExchangeOperator.ExchangeOperatorFactory(0, new PlanNodeId("test"), this.exchangeClientSupplier, SERDE_FACTORY).createOperator(TestingTaskContext.createTaskContext(this.scheduler, this.scheduledExecutor, SessionTestUtils.TEST_SESSION).addPipelineContext(0, true, true, false).addDriverContext());
        Assert.assertEquals(createOperator.getOperatorContext().getOperatorStats().getSystemMemoryReservation().toBytes(), 0L);
        return createOperator;
    }

    private static List<Page> waitForPages(Operator operator, int i) throws InterruptedException {
        long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        while (true) {
            if (System.nanoTime() - nanoTime >= 0 || operator.isFinished()) {
                break;
            }
            if (operator.getOperatorContext().getDriverContext().getPipelineContext().getPipelineStats().getSystemMemoryReservation().toBytes() > 0) {
                z = true;
                break;
            }
            Thread.sleep(10L);
        }
        Assert.assertTrue(z);
        while (arrayList.size() < i && System.nanoTime() < nanoTime) {
            Assert.assertEquals(operator.needsInput(), false);
            if (operator.isFinished()) {
                break;
            }
            Page output = operator.getOutput();
            if (output != null) {
                arrayList.add(output);
            } else {
                Thread.sleep(10L);
            }
        }
        Thread.sleep(10L);
        Assert.assertEquals(operator.needsInput(), false);
        Assert.assertNull(operator.getOutput());
        Assert.assertEquals(arrayList.size(), i);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            PageAssertions.assertPageEquals(TYPES, (Page) it.next(), TestingTaskBuffer.PAGE);
        }
        Assert.assertEquals(operator.getOperatorContext().getOperatorStats().getSystemMemoryReservation().toBytes(), 0L);
        return arrayList;
    }

    private static void waitForFinished(Operator operator) throws InterruptedException {
        long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        while (System.nanoTime() - nanoTime < 0) {
            Assert.assertEquals(operator.needsInput(), false);
            Assert.assertNull(operator.getOutput());
            if (operator.isFinished()) {
                break;
            } else {
                Thread.sleep(10L);
            }
        }
        Assert.assertEquals(operator.isFinished(), true);
        Assert.assertEquals(operator.needsInput(), false);
        Assert.assertNull(operator.getOutput());
        Assert.assertEquals(operator.getOperatorContext().getOperatorStats().getSystemMemoryReservation().toBytes(), 0L);
    }
}
