package io.trino.operator;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.node.NodeInfo;
import io.trino.FeaturesConfig;
import io.trino.RowPagesBuilder;
import io.trino.SessionTestUtils;
import io.trino.cache.SafeCaches;
import io.trino.exchange.DirectExchangeInput;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.metadata.Split;
import io.trino.operator.MergeOperator;
import io.trino.spi.Page;
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.split.RemoteSplit;
import io.trino.sql.gen.OrderingCompiler;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingTaskContext;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testng.Assert;

@TestInstance(TestInstance.Lifecycle.PER_METHOD)
/* loaded from: input_file:io/trino/operator/TestMergeOperator.class */
public class TestMergeOperator {
    private static final TaskId TASK_1_ID = new TaskId(new StageId("query", 0), 0, 0);
    private static final TaskId TASK_2_ID = new TaskId(new StageId("query", 0), 1, 0);
    private static final TaskId TASK_3_ID = new TaskId(new StageId("query", 0), 2, 0);
    private final AtomicInteger operatorId = new AtomicInteger();
    private ScheduledExecutorService executor;
    private PagesSerdeFactory serdeFactory;
    private HttpClient httpClient;
    private DirectExchangeClientFactory exchangeClientFactory;
    private OrderingCompiler orderingCompiler;
    private LoadingCache<TaskId, TestingTaskBuffer> taskBuffers;

    @BeforeEach
    public void setUp() {
        this.executor = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed("test-merge-operator-%s"));
        this.serdeFactory = new TestingPagesSerdeFactory();
        this.taskBuffers = SafeCaches.buildNonEvictableCache(CacheBuilder.newBuilder(), CacheLoader.from(TestingTaskBuffer::new));
        this.httpClient = new TestingHttpClient(new TestingExchangeHttpClientHandler(this.taskBuffers, this.serdeFactory), this.executor);
        this.exchangeClientFactory = new DirectExchangeClientFactory(new NodeInfo("test"), new FeaturesConfig(), new DirectExchangeClientConfig(), this.httpClient, this.executor, new ExchangeManagerRegistry());
        this.orderingCompiler = new OrderingCompiler(new TypeOperators());
    }

    @AfterEach
    public void tearDown() {
        this.serdeFactory = null;
        this.orderingCompiler = null;
        this.httpClient.close();
        this.httpClient = null;
        this.executor.shutdownNow();
        this.executor = null;
        this.exchangeClientFactory.stop();
        this.exchangeClientFactory = null;
    }

    @Test
    public void testSingleStream() throws Exception {
        ImmutableList of = ImmutableList.of(BigintType.BIGINT, BigintType.BIGINT);
        MergeOperator createMergeOperator = createMergeOperator(of, ImmutableList.of(1), ImmutableList.of(0, 1), ImmutableList.of(SortOrder.ASC_NULLS_FIRST, SortOrder.ASC_NULLS_FIRST));
        Assert.assertFalse(createMergeOperator.isFinished());
        Assert.assertFalse(createMergeOperator.isBlocked().isDone());
        createMergeOperator.addSplit(createRemoteSplit(TASK_1_ID));
        Assert.assertFalse(createMergeOperator.isFinished());
        Assert.assertFalse(createMergeOperator.isBlocked().isDone());
        createMergeOperator.noMoreSplits();
        List<Page> build = RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(1, 1).row(2, 2).pageBreak().row(3, 3).row(4, 4).build();
        Assert.assertNull(createMergeOperator.getOutput());
        Assert.assertFalse(createMergeOperator.isFinished());
        OperatorAssertion.assertOperatorIsBlocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPage(build.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked(createMergeOperator);
        Assert.assertNull(createMergeOperator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPage(build.get(1), true);
        OperatorAssertion.assertOperatorIsUnblocked(createMergeOperator);
        PageAssertions.assertPageEquals(ImmutableList.of(BigintType.BIGINT), (Page) Iterables.getOnlyElement(pullAvailablePages(createMergeOperator)), RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT).row(1).row(2).row(3).row(4).build().get(0));
        createMergeOperator.close();
    }

    @Test
    public void testMergeDifferentTypes() throws Exception {
        ImmutableList of = ImmutableList.of(BigintType.BIGINT, IntegerType.INTEGER);
        MergeOperator createMergeOperator = createMergeOperator(of, ImmutableList.of(1, 0), ImmutableList.of(1, 0), ImmutableList.of(SortOrder.DESC_NULLS_FIRST, SortOrder.ASC_NULLS_FIRST));
        createMergeOperator.addSplit(createRemoteSplit(TASK_1_ID));
        createMergeOperator.addSplit(createRemoteSplit(TASK_2_ID));
        createMergeOperator.noMoreSplits();
        List<Page> build = RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(0, null).row(1, 4).row(2, 3).build();
        List<Page> build2 = RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(null, 5).row(2, 5).row(4, 3).build();
        Assert.assertNull(createMergeOperator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages((Iterable<Page>) build, true);
        OperatorAssertion.assertOperatorIsUnblocked(createMergeOperator);
        Assert.assertNull(createMergeOperator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages((Iterable<Page>) build2, true);
        OperatorAssertion.assertOperatorIsUnblocked(createMergeOperator);
        ImmutableList of2 = ImmutableList.of(IntegerType.INTEGER, BigintType.BIGINT);
        PageAssertions.assertPageEquals(of2, (Page) Iterables.getOnlyElement(pullAvailablePages(createMergeOperator)), RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of2).row(null, 0).row(5, null).row(5, 2).row(4, 1).row(3, 2).row(3, 4).build().get(0));
        createMergeOperator.close();
    }

    @Test
    public void testMultipleStreamsSameOutputColumns() throws Exception {
        ImmutableList of = ImmutableList.of(BigintType.BIGINT, BigintType.BIGINT, BigintType.BIGINT);
        MergeOperator createMergeOperator = createMergeOperator(of, ImmutableList.of(0, 1, 2), ImmutableList.of(0), ImmutableList.of(SortOrder.ASC_NULLS_FIRST));
        createMergeOperator.addSplit(createRemoteSplit(TASK_1_ID));
        createMergeOperator.addSplit(createRemoteSplit(TASK_2_ID));
        createMergeOperator.addSplit(createRemoteSplit(TASK_3_ID));
        createMergeOperator.noMoreSplits();
        List<Page> build = RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(1, 1, 2).row(8, 1, 1).row(19, 1, 3).row(27, 1, 4).row(41, 2, 5).pageBreak().row(55, 1, 2).row(89, 1, 3).row(101, 1, 4).row(202, 1, 3).row(399, 2, 2).pageBreak().row(400, 1, 1).row(401, 1, 7).row(402, 1, 6).build();
        List<Page> build2 = RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(2, 1, 2).row(8, 1, 1).row(19, 1, 3).row(25, 1, 4).row(26, 2, 5).pageBreak().row(56, 1, 2).row(66, 1, 3).row(77, 1, 4).row(88, 1, 3).row(99, 2, 2).pageBreak().row(99, 1, 1).row(100, 1, 7).row(100, 1, 6).build();
        List<Page> build3 = RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(88, 1, 3).row(89, 1, 3).row(90, 1, 3).row(91, 1, 4).row(92, 2, 5).pageBreak().row(93, 1, 2).row(94, 1, 3).row(95, 1, 4).row(97, 1, 3).row(98, 2, 2).build();
        Assert.assertNull(createMergeOperator.getOutput());
        Assert.assertFalse(createMergeOperator.isFinished());
        OperatorAssertion.assertOperatorIsBlocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPage(build.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked(createMergeOperator);
        Assert.assertNull(createMergeOperator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPage(build2.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked(createMergeOperator);
        Assert.assertNull(createMergeOperator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPage(build3.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked(createMergeOperator);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPage(build.get(1), false);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPage(build2.get(1), false);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPage(build3.get(1), true);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPage(build2.get(2), true);
        ((TestingTaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPage(build.get(2), true);
        PageAssertions.assertPageEquals(of, (Page) Iterables.getOnlyElement(pullAvailablePages(createMergeOperator)), RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(1, 1, 2).row(2, 1, 2).row(8, 1, 1).row(8, 1, 1).row(19, 1, 3).row(19, 1, 3).row(25, 1, 4).row(26, 2, 5).row(27, 1, 4).row(41, 2, 5).row(55, 1, 2).row(56, 1, 2).row(66, 1, 3).row(77, 1, 4).row(88, 1, 3).row(88, 1, 3).row(89, 1, 3).row(89, 1, 3).row(90, 1, 3).row(91, 1, 4).row(92, 2, 5).row(93, 1, 2).row(94, 1, 3).row(95, 1, 4).row(97, 1, 3).row(98, 2, 2).row(99, 2, 2).row(99, 1, 1).row(100, 1, 7).row(100, 1, 6).row(101, 1, 4).row(202, 1, 3).row(399, 2, 2).row(400, 1, 1).row(401, 1, 7).row(402, 1, 6).build().get(0));
        createMergeOperator.close();
    }

    private MergeOperator createMergeOperator(List<Type> list, List<Integer> list2, List<Integer> list3, List<SortOrder> list4) {
        int andIncrement = this.operatorId.getAndIncrement();
        return new MergeOperator.MergeOperatorFactory(andIncrement, new PlanNodeId("plan_node_id" + andIncrement), this.exchangeClientFactory, this.serdeFactory, this.orderingCompiler, list, list2, list3, list4).createOperator(TestingTaskContext.createTaskContext(this.executor, this.executor, SessionTestUtils.TEST_SESSION).addPipelineContext(0, true, true, false).addDriverContext());
    }

    private static Split createRemoteSplit(TaskId taskId) {
        return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(taskId, "http://localhost/" + taskId)));
    }

    private static List<Page> pullAvailablePages(Operator operator) throws Exception {
        long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        ArrayList arrayList = new ArrayList();
        OperatorAssertion.assertOperatorIsUnblocked(operator);
        while (!operator.isFinished() && System.nanoTime() - nanoTime < 0) {
            Assert.assertFalse(operator.needsInput());
            Page output = operator.getOutput();
            if (output != null) {
                arrayList.add(output);
            } else {
                Thread.sleep(10L);
            }
        }
        Assert.assertFalse(operator.needsInput(), "Operator still wants input");
        Assert.assertTrue(operator.isFinished(), "Expected operator to be finished");
        operator.close();
        operator.getOperatorContext().destroy();
        Assert.assertEquals(((OperatorStats) Iterables.getOnlyElement(operator.getOperatorContext().getNestedOperatorStats())).getUserMemoryReservation().toBytes(), 0L);
        return arrayList;
    }
}
