package io.trino.plugin.kudu;

import io.trino.Session;
import io.trino.cost.StatsAndCosts;
import io.trino.sql.planner.Plan;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.planprinter.PlanPrinter;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import java.util.function.Consumer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/plugin/kudu/TestKuduIntegrationGroupedExecution.class */
public class TestKuduIntegrationGroupedExecution extends AbstractTestQueryFramework {
    private static final String SCHEMA_KUDU = "kudu";
    private static final String KUDU_GROUPED_EXECUTION = "grouped_execution";
    private TestingKuduServer kuduServer;

    protected QueryRunner createQueryRunner() throws Exception {
        this.kuduServer = new TestingKuduServer();
        return KuduQueryRunnerFactory.createKuduQueryRunner(this.kuduServer, Session.builder(KuduQueryRunnerFactory.createSession("test_grouped_execution")).setSystemProperty("colocated_join", "true").setSystemProperty(KUDU_GROUPED_EXECUTION, "true").setSystemProperty("concurrent_lifespans_per_task", "1").setSystemProperty("dynamic_schedule_for_grouped_execution", "false").setSystemProperty("enable_dynamic_filtering", "false").setCatalogSessionProperty(SCHEMA_KUDU, KUDU_GROUPED_EXECUTION, "true").build());
    }

    @AfterClass(alwaysRun = true)
    public final void destroy() {
        if (this.kuduServer != null) {
            this.kuduServer.close();
            this.kuduServer = null;
        }
    }

    @Test
    public void testGroupedExecutionJoin() {
        assertUpdate("CREATE TABLE IF NOT EXISTS test_grouped_execution_t1 (key1 INT WITH (primary_key=true), key2 INT WITH (primary_key=true), attr1 INT) WITH ( partition_by_hash_columns = ARRAY['key1'],  partition_by_hash_buckets = 2)");
        assertUpdate("CREATE TABLE IF NOT EXISTS test_grouped_execution_t2 (key1 INT WITH (primary_key=true), key2 INT WITH (primary_key=true), attr2 decimal(10, 6)) WITH ( partition_by_hash_columns = ARRAY['key1'],  partition_by_hash_buckets = 2)");
        assertUpdate("INSERT INTO test_grouped_execution_t1 VALUES (0, 0, 0), (0, 1, 0), (1, 1, 1)", 3L);
        assertUpdate("INSERT INTO test_grouped_execution_t2 VALUES (0, 0, 0), (1, 1, 1), (1, 2, 1)", 3L);
        assertQuery(getSession(), "SELECT t1.* FROM test_grouped_execution_t1 t1 join test_grouped_execution_t2 t2 on t1.key1=t2.key1 WHERE t1.attr1=0", "VALUES (0, 0, 0), (0, 1, 0)", assertRemoteExchangesCount(1));
        assertUpdate("DROP TABLE test_grouped_execution_t1");
        assertUpdate("DROP TABLE test_grouped_execution_t2");
    }

    @Test
    public void testGroupedExecutionJoinRangePartition() {
        String str = "test_grouped_execution_range_t1_" + TestTable.randomTableSuffix();
        assertUpdate("CREATE TABLE IF NOT EXISTS " + str + " (key1 INT WITH (primary_key=true), key2 INT WITH (primary_key=true), attr1 INT) WITH ( partition_by_hash_columns = ARRAY['key1'],  partition_by_hash_buckets = 2,   partition_by_range_columns = ARRAY['key2'],  range_partitions = '[{\"lower\": null, \"upper\": \"4\"}, {\"lower\": \"4\", \"upper\": null}]')");
        String str2 = "test_grouped_execution_range_t2_" + TestTable.randomTableSuffix();
        assertUpdate("CREATE TABLE IF NOT EXISTS " + str2 + " (key1 INT WITH (primary_key=true), key2 INT WITH (primary_key=true), attr2 decimal(10, 6)) WITH ( partition_by_hash_columns = ARRAY['key1'],  partition_by_hash_buckets = 2,  partition_by_range_columns = ARRAY['key2'],  range_partitions = '[{\"lower\": null, \"upper\": \"4\"}, {\"lower\": \"4\", \"upper\": null}]')");
        assertUpdate("INSERT INTO " + str + " VALUES (0, 0, 0), (0, 5, 0), (1, 0, 0), (1, 5, 0)", 4L);
        assertUpdate("INSERT INTO " + str2 + " VALUES (0, 0, 0), (0, 5, 1), (1, 0, 0), (1, 5, 2)", 4L);
        assertQuery(getSession(), "SELECT t1.* FROM " + str + " t1 join " + str2 + " t2 on t1.key1=t2.key1 WHERE t2.attr2=2", "VALUES (1, 0, 0), (1, 5, 0)", assertRemoteExchangesCount(1));
        assertUpdate("DROP TABLE " + str);
        assertUpdate("DROP TABLE " + str2);
    }

    @Test
    public void testGroupedExecutionGroupBy() {
        assertUpdate("CREATE TABLE IF NOT EXISTS test_grouped_execution (key1 INT WITH (primary_key=true), key2 INT WITH (primary_key=true), attr INT) WITH ( partition_by_hash_columns = ARRAY['key1'],  partition_by_hash_buckets = 2)");
        assertUpdate("INSERT INTO test_grouped_execution VALUES (0, 0, 0), (0, 1, 1), (1, 0, 1)", 3L);
        assertQuery(getSession(), "SELECT key1, COUNT(1) FROM test_grouped_execution GROUP BY key1", "VALUES (0, 2), (1, 1)", assertRemoteExchangesCount(1));
        assertUpdate("DROP TABLE test_grouped_execution");
    }

    @Test
    public void testGroupedExecutionMultiLevelPartitioning() {
        assertUpdate("CREATE TABLE IF NOT EXISTS test_grouped_execution_mtlvl (key1 BIGINT WITH (primary_key=true),key2 BIGINT WITH (primary_key=true),key3 BIGINT WITH (primary_key=true),key4 BIGINT WITH (primary_key=true),attr1 BIGINT) WITH ( partition_by_hash_columns = ARRAY['key1', 'key2'], partition_by_hash_buckets = 2, partition_by_second_hash_columns = ARRAY['key3'], partition_by_second_hash_buckets = 2)");
        assertUpdate("INSERT INTO test_grouped_execution_mtlvl VALUES (0, 0, 0, 0, 0), (0, 0, 0, 1, 1), (1, 1, 1, 0, 0), (1, 1, 1, 1, 1)", 4L);
        assertQuery(getSession(), "SELECT key1, key2, key3, COUNT(1) FROM test_grouped_execution_mtlvl GROUP BY key1, key2, key3", "VALUES (0, 0, 0, 2), (1, 1, 1, 2)", assertRemoteExchangesCount(1));
        assertUpdate("DROP TABLE test_grouped_execution_mtlvl");
    }

    @Test
    public void testGroupedExecutionMultiLevelCombinedPartitioning() {
        assertUpdate("CREATE TABLE test_grouped_execution_hash_range (key1 BIGINT WITH (primary_key=true),key2 BIGINT WITH (primary_key=true),key3 BIGINT WITH (primary_key=true),key4 BIGINT WITH (primary_key=true),attr1 BIGINT) WITH (  partition_by_hash_columns = ARRAY['key1'],  partition_by_hash_buckets = 2,  partition_by_second_hash_columns = ARRAY['key2'],  partition_by_second_hash_buckets = 3,  partition_by_range_columns = ARRAY['key3'],  range_partitions = '[{\"lower\": null, \"upper\": \"4\"}, {\"lower\": \"4\", \"upper\": \"9\"}, {\"lower\": \"9\", \"upper\": null}]')");
        assertUpdate("INSERT INTO test_grouped_execution_hash_range VALUES (0, 0, 0, 0, 0), (0, 0, 9, 0, 9), (0, 0, 9, 1, 0), (1, 1, 0, 0, 1), (1, 1, 9, 0, 2)", 5L);
        assertQuery(getSession(), "SELECT key1, key2, key3, COUNT(1) FROM test_grouped_execution_hash_range GROUP BY key1, key2, key3", "VALUES (0, 0, 0, 1), (0, 0, 9, 2), (1, 1, 0, 1), (1, 1, 9, 1)", assertRemoteExchangesCount(1));
        assertUpdate("DROP TABLE test_grouped_execution_hash_range");
    }

    private Consumer<Plan> assertRemoteExchangesCount(int i) {
        return plan -> {
            int size = PlanNodeSearcher.searchFrom(plan.getRoot()).where(planNode -> {
                return (planNode instanceof ExchangeNode) && ((ExchangeNode) planNode).getScope() == ExchangeNode.Scope.REMOTE;
            }).findAll().size();
            if (size != i) {
                throw new AssertionError(String.format("Expected %s remote exchanges but found %s. Actual plan is:\n%s]", Integer.valueOf(i), Integer.valueOf(size), PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes(), getDistributedQueryRunner().getCoordinator().getMetadata(), getDistributedQueryRunner().getCoordinator().getFunctionManager(), StatsAndCosts.empty(), getSession(), 0, false)));
            }
        };
    }
}
