package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.plugin.kafka.util.TestUtils;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResultWithQueryId;
import io.trino.testing.QueryRunner;
import io.trino.testing.assertions.Assert;
import io.trino.testing.kafka.TestingKafka;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@Execution(ExecutionMode.SAME_THREAD)
/* loaded from: input_file:io/trino/plugin/kafka/TestKafkaIntegrationPushDown.class */
public class TestKafkaIntegrationPushDown extends AbstractTestQueryFramework {
    private static final int MESSAGE_NUM = 1000;
    private static final int TIMESTAMP_TEST_COUNT = 6;
    private static final int TIMESTAMP_TEST_START_INDEX = 2;
    private static final int TIMESTAMP_TEST_END_INDEX = 4;
    private TestingKafka testingKafka;
    private String topicNamePartition;
    private String topicNameOffset;
    private String topicNameCreateTime;
    private String topicNameLogAppend;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/kafka/TestKafkaIntegrationPushDown$RecordMessage.class */
    public static class RecordMessage {
        private final String startTime;
        private final String endTime;

        public RecordMessage(String str, String str2) {
            this.startTime = (String) Objects.requireNonNull(str, "startTime result is none");
            this.endTime = (String) Objects.requireNonNull(str2, "endTime result is none");
        }

        public String getStartTime() {
            return this.startTime;
        }

        public String getEndTime() {
            return this.endTime;
        }
    }

    protected QueryRunner createQueryRunner() throws Exception {
        this.testingKafka = closeAfterClass(TestingKafka.create());
        this.topicNamePartition = "test_push_down_partition_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameOffset = "test_push_down_offset_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameCreateTime = "test_push_down_create_time_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameLogAppend = "test_push_down_log_append_" + UUID.randomUUID().toString().replaceAll("-", "_");
        DistributedQueryRunner build = KafkaQueryRunner.builder(this.testingKafka).setExtraTopicDescription(ImmutableMap.builder().put(TestUtils.createEmptyTopicDescription(this.topicNamePartition, new SchemaTableName("default", this.topicNamePartition))).put(TestUtils.createEmptyTopicDescription(this.topicNameOffset, new SchemaTableName("default", this.topicNameOffset))).put(TestUtils.createEmptyTopicDescription(this.topicNameCreateTime, new SchemaTableName("default", this.topicNameCreateTime))).put(TestUtils.createEmptyTopicDescription(this.topicNameLogAppend, new SchemaTableName("default", this.topicNameLogAppend))).buildOrThrow()).setExtraKafkaProperties(ImmutableMap.of("kafka.messages-per-split", "100")).build();
        this.testingKafka.createTopicWithConfig(2, 1, this.topicNamePartition, false);
        this.testingKafka.createTopicWithConfig(2, 1, this.topicNameOffset, false);
        this.testingKafka.createTopicWithConfig(1, 1, this.topicNameCreateTime, false);
        this.testingKafka.createTopicWithConfig(1, 1, this.topicNameLogAppend, true);
        return build;
    }

    @Test
    public void testPartitionPushDown() {
        createMessages(this.topicNamePartition);
        String format = String.format("SELECT count(*) FROM default.%s WHERE _partition_id=1", this.topicNamePartition);
        Assert.assertEventually(() -> {
            Assertions.assertThat(getQueryInfo(getDistributedQueryRunner(), getDistributedQueryRunner().executeWithQueryId(getSession(), format)).getQueryStats().getProcessedInputPositions()).isEqualTo(500L);
        });
    }

    @Test
    public void testOffsetPushDown() {
        createMessages(this.topicNameOffset);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _partition_offset between 2 and 10", this.topicNameOffset), 18L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _partition_offset > 2 and _partition_offset < 10", this.topicNameOffset), 14L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _partition_offset = 3", this.topicNameOffset), 2L);
    }

    @Test
    public void testTimestampCreateTimeModePushDown() throws Exception {
        RecordMessage createTimestampTestMessages = createTimestampTestMessages(this.topicNameCreateTime);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp < timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getEndTime()), 1000L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getEndTime()), 1000L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp > timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime()), 997L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime()), 998L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp between timestamp '%s' and timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime()), 998L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime()), 998L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp = timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime()), 998L);
        Session build = Session.builder(getSession()).setSystemProperty("kafka.timestamp_upper_bound_force_push_down_enabled", "true").build();
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp < timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getEndTime()), 4L, build);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getEndTime()), 4L, build);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp > timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime()), 997L, build);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime()), 998L, build);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp between timestamp '%s' and timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime()), 2L, build);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime()), 2L, build);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp = timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime()), 0L, build);
    }

    @Test
    public void testTimestampLogAppendModePushDown() throws Exception {
        RecordMessage createTimestampTestMessages = createTimestampTestMessages(this.topicNameLogAppend);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp < timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getEndTime()), 4L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getEndTime()), 4L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp > timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getStartTime()), 997L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getStartTime()), 998L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp between timestamp '%s' and timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime()), 2L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime()), 2L);
        assertProcessedInputPositions(String.format("SELECT count(*) FROM default.%s WHERE _timestamp = timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getStartTime()), 0L);
    }

    private void assertProcessedInputPositions(String str, long j) {
        assertProcessedInputPositions(str, j, getSession());
    }

    private void assertProcessedInputPositions(String str, long j, Session session) {
        DistributedQueryRunner distributedQueryRunner = getDistributedQueryRunner();
        Assert.assertEventually(() -> {
            Assertions.assertThat(getQueryInfo(distributedQueryRunner, distributedQueryRunner.executeWithQueryId(session, str)).getQueryStats().getProcessedInputPositions()).isEqualTo(j);
        });
    }

    private static QueryInfo getQueryInfo(DistributedQueryRunner distributedQueryRunner, MaterializedResultWithQueryId materializedResultWithQueryId) {
        return distributedQueryRunner.getCoordinator().getQueryManager().getFullQueryInfo(materializedResultWithQueryId.getQueryId());
    }

    private RecordMessage createTimestampTestMessages(String str) throws Exception {
        String str2 = null;
        String str3 = null;
        for (int i = 0; i < TIMESTAMP_TEST_COUNT; i++) {
            RecordMetadata sendMessages = this.testingKafka.sendMessages(Stream.of(new ProducerRecord(str, Long.valueOf(i), Long.valueOf(i))));
            if (i == 2) {
                str2 = getTimestamp(sendMessages);
            } else if (i == TIMESTAMP_TEST_END_INDEX) {
                str3 = getTimestamp(sendMessages);
            }
            Thread.sleep(100L);
        }
        this.testingKafka.sendMessages(LongStream.range(6L, 1000L).mapToObj(j -> {
            return new ProducerRecord(str, Long.valueOf(j), Long.valueOf(j));
        }));
        return new RecordMessage(str2, str3);
    }

    private static String getTimestamp(RecordMetadata recordMetadata) {
        return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.ofInstant(Instant.ofEpochMilli(recordMetadata.timestamp()), ZoneId.of("UTC")));
    }

    private void createMessages(String str) {
        this.testingKafka.sendMessages(IntStream.range(0, MESSAGE_NUM).mapToObj(i -> {
            return new ProducerRecord(str, Long.valueOf(i), Long.valueOf(i));
        }));
    }
}
