package io.trino.memory;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.execution.QueryState;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.operator.BlockedReason;
import io.trino.plugin.blackhole.BlackHolePlugin;
import io.trino.server.BasicQueryInfo;
import io.trino.server.BasicQueryStats;
import io.trino.server.testing.TestingTrinoServer;
import io.trino.spi.StandardErrorCode;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.TestingSession;
import io.trino.tests.tpch.TpchQueryRunnerBuilder;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.intellij.lang.annotations.Language;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/memory/TestMemoryManager.class */
public class TestMemoryManager {
    private static final Session SESSION = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("sf1000").build();
    private static final Session TINY_SESSION = TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").build();
    private ExecutorService executor;

    @BeforeClass
    public void setUp() {
        this.executor = Executors.newCachedThreadPool();
    }

    @AfterClass(alwaysRun = true)
    public void shutdown() {
        this.executor.shutdownNow();
        this.executor = null;
    }

    @Test(timeOut = 240000)
    public void testResourceOverCommit() throws Exception {
        DistributedQueryRunner createQueryRunner = createQueryRunner(TINY_SESSION, ImmutableMap.builder().put("query.max-memory-per-node", "1kB").put("query.max-memory", "1kB").buildOrThrow());
        try {
            Assertions.assertThatThrownBy(() -> {
                createQueryRunner.execute(TestMemorySessionProperties.sql);
            }).isInstanceOf(RuntimeException.class).hasMessageStartingWith("Query exceeded per-node memory limit of ");
            createQueryRunner.execute(TestingSession.testSessionBuilder().setCatalog("tpch").setSchema("tiny").setSystemProperty("resource_overcommit", "true").build(), TestMemorySessionProperties.sql);
            if (createQueryRunner != null) {
                createQueryRunner.close();
            }
        } catch (Throwable th) {
            if (createQueryRunner != null) {
                try {
                    createQueryRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 240000)
    public void testOutOfMemoryKiller() throws Exception {
        DistributedQueryRunner createQueryRunner = createQueryRunner(TINY_SESSION, ImmutableMap.builder().put("query.low-memory-killer.delay", "5s").put("query.low-memory-killer.policy", "total-reservation").buildOrThrow());
        try {
            createQueryRunner.installPlugin(new BlackHolePlugin());
            createQueryRunner.createCatalog("blackhole", "blackhole");
            createQueryRunner.execute("CREATE TABLE blackhole.default.take_30s(dummy varchar(10)) WITH (split_count=1, pages_per_split=30, rows_per_page=1, page_processing_delay='1s')");
            TaskId taskId = new TaskId(new StageId("fake", 0), 0, 0);
            Iterator it = createQueryRunner.getServers().iterator();
            while (it.hasNext()) {
                MemoryPool memoryPool = ((TestingTrinoServer) it.next()).getLocalMemoryManager().getMemoryPool();
                Assert.assertTrue(memoryPool.tryReserve(taskId, "test", memoryPool.getMaxBytes()));
            }
            int i = 2;
            ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.executor);
            for (int i2 = 0; i2 < 2; i2++) {
                executorCompletionService.submit(() -> {
                    return createQueryRunner.execute("SELECT COUNT(*), clerk FROM (SELECT clerk FROM orders UNION ALL SELECT dummy FROM blackhole.default.take_30s)GROUP BY clerk");
                });
            }
            io.trino.testing.assertions.Assert.assertEventually(() -> {
                Assertions.assertThat(createQueryRunner.getCoordinator().getQueryManager().getQueries()).hasSize(1 + i);
            });
            waitForQueryToBeKilled(createQueryRunner);
            Iterator it2 = createQueryRunner.getServers().iterator();
            while (it2.hasNext()) {
                MemoryPool memoryPool2 = ((TestingTrinoServer) it2.next()).getLocalMemoryManager().getMemoryPool();
                Assert.assertTrue(memoryPool2.getReservedBytes() > 0);
                memoryPool2.free(taskId, "test", memoryPool2.getMaxBytes());
                Assert.assertTrue(memoryPool2.getFreeBytes() > 0);
            }
            Assertions.assertThatThrownBy(() -> {
                for (int i3 = 0; i3 < i; i3++) {
                    executorCompletionService.take().get();
                }
            }).isInstanceOf(ExecutionException.class).hasMessageMatching(".*Query killed because the cluster is out of memory. Please try again in a few minutes.");
            if (createQueryRunner != null) {
                createQueryRunner.close();
            }
        } catch (Throwable th) {
            if (createQueryRunner != null) {
                try {
                    createQueryRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void waitForQueryToBeKilled(DistributedQueryRunner distributedQueryRunner) throws InterruptedException {
        while (true) {
            boolean z = false;
            for (BasicQueryInfo basicQueryInfo : distributedQueryRunner.getCoordinator().getQueryManager().getQueries()) {
                if (basicQueryInfo.getState() == QueryState.FAILED) {
                    Assert.assertEquals(basicQueryInfo.getErrorCode(), StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode());
                    return;
                } else {
                    Assert.assertNull(basicQueryInfo.getErrorCode(), "errorCode unexpectedly present for " + basicQueryInfo);
                    if (!basicQueryInfo.getState().isDone()) {
                        z = true;
                    }
                }
            }
            Preconditions.checkState(z, "All queries already completed without failure");
            TimeUnit.MILLISECONDS.sleep(10L);
        }
    }

    @Test(timeOut = 240000)
    public void testNoLeak() throws Exception {
        testNoLeak("SELECT clerk FROM orders");
        testNoLeak("SELECT COUNT(*), clerk FROM orders WHERE orderstatus='O' GROUP BY clerk");
    }

    private void testNoLeak(@Language("SQL") String str) throws Exception {
        DistributedQueryRunner createQueryRunner = createQueryRunner(TINY_SESSION, ImmutableMap.builder().put("task.verbose-stats", "true").buildOrThrow());
        try {
            this.executor.submit(() -> {
                return createQueryRunner.execute(str);
            }).get();
            Iterator it = createQueryRunner.getCoordinator().getQueryManager().getQueries().iterator();
            while (it.hasNext()) {
                Assert.assertEquals(((BasicQueryInfo) it.next()).getState(), QueryState.FINISHED);
            }
            Iterator it2 = createQueryRunner.getServers().iterator();
            while (it2.hasNext()) {
                MemoryPool memoryPool = ((TestingTrinoServer) it2.next()).getLocalMemoryManager().getMemoryPool();
                Assert.assertEquals(memoryPool.getMaxBytes(), memoryPool.getFreeBytes());
            }
            if (createQueryRunner != null) {
                createQueryRunner.close();
            }
        } catch (Throwable th) {
            if (createQueryRunner != null) {
                try {
                    createQueryRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 240000)
    public void testClusterPools() throws Exception {
        DistributedQueryRunner createQueryRunner = createQueryRunner(TINY_SESSION, ImmutableMap.builder().put("task.verbose-stats", "true").buildOrThrow());
        try {
            TaskId taskId = new TaskId(new StageId("fake", 0), 0, 0);
            Iterator it = createQueryRunner.getServers().iterator();
            while (it.hasNext()) {
                MemoryPool memoryPool = ((TestingTrinoServer) it.next()).getLocalMemoryManager().getMemoryPool();
                Assert.assertTrue(memoryPool.tryReserve(taskId, "test", memoryPool.getMaxBytes()));
            }
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 2; i++) {
                arrayList.add(this.executor.submit(() -> {
                    return createQueryRunner.execute("SELECT COUNT(*), cast(orderkey as varchar), partkey FROM lineitem GROUP BY cast(orderkey as varchar), partkey");
                }));
            }
            ClusterMemoryPool pool = createQueryRunner.getCoordinator().getClusterMemoryManager().getPool();
            Assert.assertNotNull(pool);
            while (pool.getBlockedNodes() != 2) {
                TimeUnit.MILLISECONDS.sleep(10L);
            }
            List queries = createQueryRunner.getCoordinator().getQueryManager().getQueries();
            while (queries.size() != 2) {
                TimeUnit.MILLISECONDS.sleep(10L);
                queries = createQueryRunner.getCoordinator().getQueryManager().getQueries();
            }
            Iterator it2 = queries.iterator();
            while (it2.hasNext()) {
                Assert.assertFalse(((BasicQueryInfo) it2.next()).getState().isDone());
            }
            while (!queries.stream().allMatch(TestMemoryManager::isBlockedWaitingForMemory)) {
                TimeUnit.MILLISECONDS.sleep(10L);
                queries = createQueryRunner.getCoordinator().getQueryManager().getQueries();
                Iterator it3 = queries.iterator();
                while (it3.hasNext()) {
                    Assert.assertFalse(((BasicQueryInfo) it3.next()).getState().isDone());
                }
            }
            Iterator it4 = createQueryRunner.getServers().iterator();
            while (it4.hasNext()) {
                MemoryPool memoryPool2 = ((TestingTrinoServer) it4.next()).getLocalMemoryManager().getMemoryPool();
                memoryPool2.free(taskId, "test", memoryPool2.getMaxBytes());
                Assert.assertTrue(memoryPool2.getFreeBytes() > 0);
            }
            Iterator it5 = arrayList.iterator();
            while (it5.hasNext()) {
                ((Future) it5.next()).get();
            }
            Iterator it6 = createQueryRunner.getCoordinator().getQueryManager().getQueries().iterator();
            while (it6.hasNext()) {
                Assert.assertEquals(((BasicQueryInfo) it6.next()).getState(), QueryState.FINISHED);
            }
            Iterator it7 = createQueryRunner.getServers().iterator();
            while (it7.hasNext()) {
                MemoryPool memoryPool3 = ((TestingTrinoServer) it7.next()).getLocalMemoryManager().getMemoryPool();
                Assert.assertEquals(memoryPool3.getMaxBytes(), memoryPool3.getFreeBytes());
            }
            if (createQueryRunner != null) {
                createQueryRunner.close();
            }
        } catch (Throwable th) {
            if (createQueryRunner != null) {
                try {
                    createQueryRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static boolean isBlockedWaitingForMemory(BasicQueryInfo basicQueryInfo) {
        BasicQueryStats queryStats = basicQueryInfo.getQueryStats();
        if (queryStats.getBlockedReasons().contains(BlockedReason.WAITING_FOR_MEMORY)) {
            return queryStats.isFullyBlocked() || queryStats.getRunningDrivers() == 0;
        }
        return false;
    }

    @Test(timeOut = 60000, expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = ".*Query exceeded distributed user memory limit of 1kB.*")
    public void testQueryUserMemoryLimit() throws Exception {
        DistributedQueryRunner createQueryRunner = createQueryRunner(SESSION, ImmutableMap.builder().put("task.max-partial-aggregation-memory", "1B").put("query.max-memory", "1kB").put("query.max-total-memory", "1GB").buildOrThrow());
        try {
            createQueryRunner.execute(SESSION, "SELECT COUNT(*), repeat(orderstatus, 1000) FROM orders GROUP BY 2");
            if (createQueryRunner != null) {
                createQueryRunner.close();
            }
        } catch (Throwable th) {
            if (createQueryRunner != null) {
                try {
                    createQueryRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000, expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = ".*Query exceeded distributed total memory limit of 120MB.*")
    public void testQueryTotalMemoryLimit() throws Exception {
        DistributedQueryRunner createQueryRunner = createQueryRunner(SESSION, ImmutableMap.builder().put("query.max-memory", "120MB").put("query.max-total-memory", "120MB").put("spill-enabled", "true").put("spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "trino", "spills").toString()).put("spiller-max-used-space-threshold", "1.0").buildOrThrow());
        try {
            createQueryRunner.execute(SESSION, "SELECT * FROM tpch.sf10.orders ORDER BY orderkey");
            if (createQueryRunner != null) {
                createQueryRunner.close();
            }
        } catch (Throwable th) {
            if (createQueryRunner != null) {
                try {
                    createQueryRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000, expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = ".*Query exceeded per-node memory limit of 1kB.*")
    public void testQueryMemoryPerNodeLimit() throws Exception {
        DistributedQueryRunner createQueryRunner = createQueryRunner(SESSION, ImmutableMap.builder().put("task.max-partial-aggregation-memory", "1B").put("query.max-memory-per-node", "1kB").buildOrThrow());
        try {
            createQueryRunner.execute(SESSION, "SELECT COUNT(*), repeat(orderstatus, 1000) FROM orders GROUP BY 2");
            if (createQueryRunner != null) {
                createQueryRunner.close();
            }
        } catch (Throwable th) {
            if (createQueryRunner != null) {
                try {
                    createQueryRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static DistributedQueryRunner createQueryRunner(Session session, Map<String, String> map) throws Exception {
        return ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) ((TpchQueryRunnerBuilder) TpchQueryRunnerBuilder.builder().amendSession(sessionBuilder -> {
            return Session.builder(session);
        })).setNodeCount(2)).setExtraProperties(map)).build();
    }
}
