package io.trino.tests.product.kafka;

import io.trino.tempto.ProductTest;
import io.trino.tempto.Requirement;
import io.trino.tempto.RequirementsProvider;
import io.trino.tempto.Requires;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.configuration.Configuration;
import io.trino.tempto.fulfillment.table.TableRequirements;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessage;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tests.product.TestGroups;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaPushdownSmokeTest.class */
public class TestKafkaPushdownSmokeTest extends ProductTest {
    private static final String KAFKA_CATALOG = "kafka";
    private static final String SCHEMA_NAME = "product_tests";
    private static final long NUM_MESSAGES = 100;
    private static final long TIMESTAMP_NUM_MESSAGES = 10;
    private static final String PUSHDOWN_PARTITION_TABLE_NAME = "pushdown_partition";
    private static final String PUSHDOWN_PARTITION_TOPIC_NAME = "pushdown_partition";
    private static final String PUSHDOWN_OFFSET_TABLE_NAME = "pushdown_offset";
    private static final String PUSHDOWN_OFFSET_TOPIC_NAME = "pushdown_offset";
    private static final String PUSHDOWN_CREATE_TIME_TABLE_NAME = "pushdown_create_time";

    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaPushdownSmokeTest$PushdownOffsetTable.class */
    private static class PushdownOffsetTable implements RequirementsProvider {
        private PushdownOffsetTable() {
        }

        public Requirement getRequirements(Configuration configuration) {
            return TableRequirements.immutableTable(new KafkaTableDefinition("product_tests.pushdown_offset", "pushdown_offset", new ListKafkaDataSource((List) LongStream.rangeClosed(1L, TestKafkaPushdownSmokeTest.NUM_MESSAGES).boxed().map(l -> {
                return new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", Long.valueOf(l.longValue() % 2))).build(), KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", l)).build());
            }).collect(Collectors.toList())), 2, 1));
        }
    }

    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaPushdownSmokeTest$PushdownPartitionTable.class */
    private static class PushdownPartitionTable implements RequirementsProvider {
        private PushdownPartitionTable() {
        }

        public Requirement getRequirements(Configuration configuration) {
            return TableRequirements.immutableTable(new KafkaTableDefinition("product_tests.pushdown_partition", "pushdown_partition", new ListKafkaDataSource((List) LongStream.rangeClosed(1L, TestKafkaPushdownSmokeTest.NUM_MESSAGES).boxed().map(l -> {
                return new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", Long.valueOf(l.longValue() % 2))).build(), KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", l)).build());
            }).collect(Collectors.toList())), 2, 1));
        }
    }

    @Test(groups = {"kafka", TestGroups.PROFILE_SPECIFIC_TESTS})
    @Requires({PushdownPartitionTable.class})
    public void testPartitionPushdown() {
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_id = 1", "kafka", SCHEMA_NAME, "pushdown_partition"), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{50L})});
    }

    @Test(groups = {"kafka", TestGroups.PROFILE_SPECIFIC_TESTS})
    @Requires({PushdownOffsetTable.class})
    public void testOffsetPushdown() {
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset BETWEEN 6 AND 10", "kafka", SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{10})});
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset > 5 AND _partition_offset < 10", "kafka", SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{8})});
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset >= 5 AND _partition_offset <= 10", "kafka", SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{12})});
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset >= 5 AND _partition_offset < 10", "kafka", SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{10})});
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset > 5 AND _partition_offset <= 10", "kafka", SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{10})});
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset = 5", "kafka", SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{2})});
    }

    @Test(groups = {"kafka", TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testCreateTimePushdown() throws InterruptedException {
        for (int i = 1; i <= TIMESTAMP_NUM_MESSAGES; i++) {
            QueryExecutor.query(String.format("INSERT INTO %s.%s.%s (bigint_key, bigint_value) VALUES (%s, %s)", "kafka", SCHEMA_NAME, PUSHDOWN_CREATE_TIME_TABLE_NAME, Integer.valueOf(i), Integer.valueOf(i)), new QueryExecutor.QueryParam[0]);
            Thread.sleep(NUM_MESSAGES);
        }
        List rows = QueryExecutor.query(String.format("SELECT CAST(_timestamp AS VARCHAR) FROM %s.%s.%s WHERE bigint_key IN (" + 4 + ", " + 4 + ") ORDER BY bigint_key", "kafka", SCHEMA_NAME, PUSHDOWN_CREATE_TIME_TABLE_NAME), new QueryExecutor.QueryParam[0]).rows();
        QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _timestamp >= TIMESTAMP '%s' AND _timestamp < TIMESTAMP '%s'", "kafka", SCHEMA_NAME, PUSHDOWN_CREATE_TIME_TABLE_NAME, (String) ((List) rows.get(0)).get(0), (String) ((List) rows.get(1)).get(0)), new QueryExecutor.QueryParam[0])).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{Long.valueOf(6 - 4)})});
    }
}
