package io.trino.plugin.kafka.protobuf;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import com.google.protobuf.Any;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Timestamp;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import io.trino.decoder.protobuf.ProtobufUtils;
import io.trino.plugin.kafka.KafkaQueryRunner;
import io.trino.plugin.kafka.protobuf.UnsupportedRecursiveTypes;
import io.trino.spi.type.SqlTimestamp;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DateTimeTestingUtils;
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.TestingKafka;
import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@Execution(ExecutionMode.SAME_THREAD)
/* loaded from: input_file:io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.class */
public class TestKafkaProtobufWithSchemaRegistryMinimalFunctionality extends AbstractTestQueryFramework {
    private static final String RECORD_NAME = "schema";
    private static final int MESSAGE_COUNT = 100;
    private TestingKafka testingKafka;

    protected QueryRunner createQueryRunner() throws Exception {
        this.testingKafka = closeAfterClass(TestingKafka.createWithSchemaRegistry());
        this.testingKafka.start();
        return KafkaQueryRunner.builderForConfluentSchemaRegistry(this.testingKafka).build();
    }

    @Test
    public void testBasicTopic() throws Exception {
        assertTopic("topic-basic-MixedCase", String.format("SELECT col_1, col_2 FROM %s", toDoubleQuoted("topic-basic-MixedCase")), String.format("SELECT col_1, col_2, col_3 FROM %s", toDoubleQuoted("topic-basic-MixedCase")), false, producerProperties());
    }

    @Test
    public void testTopicWithKeySubject() throws Exception {
        assertTopic("topic-Key-Subject", String.format("SELECT key, col_1, col_2 FROM %s", toDoubleQuoted("topic-Key-Subject")), String.format("SELECT key, col_1, col_2, col_3 FROM %s", toDoubleQuoted("topic-Key-Subject")), true, producerProperties());
    }

    @Test
    public void testTopicWithRecordNameStrategy() throws Exception {
        assertTopic("topic-Record-Name-Strategy", String.format("SELECT key, col_1, col_2 FROM \"%1$s&value-subject=%2$s\"", "topic-Record-Name-Strategy", RECORD_NAME), String.format("SELECT key, col_1, col_2, col_3 FROM \"%1$s&value-subject=%2$s\"", "topic-Record-Name-Strategy", RECORD_NAME), true, ImmutableMap.builder().putAll(producerProperties()).put("value.subject.name.strategy", RecordNameStrategy.class.getName()).buildOrThrow());
    }

    @Test
    public void testTopicWithTopicRecordNameStrategy() throws Exception {
        assertTopic("topic-Topic-Record-Name-Strategy", String.format("SELECT key, col_1, col_2 FROM \"%1$s&value-subject=%1$s-%2$s\"", "topic-Topic-Record-Name-Strategy", RECORD_NAME), String.format("SELECT key, col_1, col_2, col_3 FROM \"%1$s&value-subject=%1$s-%2$s\"", "topic-Topic-Record-Name-Strategy", RECORD_NAME), true, ImmutableMap.builder().putAll(producerProperties()).put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName()).buildOrThrow());
    }

    @Test
    public void testBasicTopicForInsert() throws Exception {
        assertTopic("topic-basic-inserts", String.format("SELECT col_1, col_2 FROM %s", toDoubleQuoted("topic-basic-inserts")), String.format("SELECT col_1, col_2, col_3 FROM %s", toDoubleQuoted("topic-basic-inserts")), false, producerProperties());
        assertQueryFails(String.format("INSERT INTO %s (col_1, col_2, col_3) VALUES ('Trino', 14, 1.4)", toDoubleQuoted("topic-basic-inserts")), "Insert is not supported for schema registry based tables");
    }

    @Test
    public void testUnsupportedRecursiveDataTypes() throws Exception {
        assertNotExists("topic-unsupported-recursive");
        UnsupportedRecursiveTypes.schema m191build = UnsupportedRecursiveTypes.schema.newBuilder().setRecursiveValueOne(UnsupportedRecursiveTypes.RecursiveValue.newBuilder().setStringValue("Value1").m144build()).m191build();
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(new ProducerRecord("topic-unsupported-recursive", createKeySchema(0L, getKeySchema()), m191build));
        this.testingKafka.sendMessages(builder.build().stream(), producerProperties());
        waitUntilTableExists("topic-unsupported-recursive");
        assertQueryFails("SELECT * FROM " + toDoubleQuoted("topic-unsupported-recursive"), "Protobuf schema containing fields with self-reference are not supported because they cannot be mapped to a Trino type: io.trino.protobuf.schema.recursive_value_one: io.trino.protobuf.RecursiveValue > io.trino.protobuf.RecursiveValue.struct_value: io.trino.protobuf.RecursiveStruct > io.trino.protobuf.RecursiveStruct.fields: io.trino.protobuf.RecursiveStruct.FieldsEntry > io.trino.protobuf.RecursiveStruct.FieldsEntry.value: io.trino.protobuf.RecursiveValue");
    }

    @Test
    public void testSchemaWithImportDataTypes() throws Exception {
        assertNotExists("topic-schema-with-import");
        Descriptors.Descriptor descriptor = getDescriptor("structural_datatypes.proto");
        DynamicMessage buildDynamicMessage = buildDynamicMessage(descriptor, ImmutableMap.builder().put("list", ImmutableList.of("Search")).put("map", ImmutableList.of(buildDynamicMessage(descriptor.findFieldByName("map").getMessageType(), ImmutableMap.of("key", "Key1", "value", "Value1")))).put("row", ImmutableMap.builder().put("string_column", "Trino").put("integer_column", 1).put("long_column", 493857959588286460L).put("double_column", Double.valueOf(3.141592653589793d)).put("float_column", Float.valueOf(3.14f)).put("boolean_column", true).put("number_column", descriptor.findEnumTypeByName("Number").findValueByName("ONE")).put("timestamp_column", getTimestamp(DateTimeTestingUtils.sqlTimestampOf(3, LocalDateTime.parse("2020-12-12T15:35:45.923")))).put("bytes_column", "Trino".getBytes(StandardCharsets.UTF_8)).buildOrThrow()).buildOrThrow());
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(new ProducerRecord("topic-schema-with-import", createKeySchema(0L, getKeySchema()), buildDynamicMessage));
        this.testingKafka.sendMessages(builder.build().stream(), producerProperties());
        waitUntilTableExists("topic-schema-with-import");
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query(String.format("SELECT list, map, row FROM %s", toDoubleQuoted("topic-schema-with-import"))))).matches("VALUES (\n    ARRAY[CAST('Search' AS VARCHAR)],\n    MAP(CAST(ARRAY['Key1'] AS ARRAY(VARCHAR)), CAST(ARRAY['Value1'] AS ARRAY(VARCHAR))),\n    CAST(ROW('Trino', 1, 493857959588286460, 3.14159265358979323846, 3.14, True, 'ONE', TIMESTAMP '2020-12-12 15:35:45.923', to_utf8('Trino'))\n        AS ROW(\n            string_column VARCHAR,\n            integer_column INTEGER,\n            long_column BIGINT,\n            double_column DOUBLE,\n            float_column REAL,\n            boolean_column BOOLEAN,\n            number_column VARCHAR,\n            timestamp_column TIMESTAMP(6),\n            bytes_column VARBINARY)))");
    }

    @Test
    public void testOneof() throws Exception {
        assertNotExists("topic-schema-with-oneof");
        Descriptors.Descriptor descriptor = ((ProtobufSchema) new ProtobufSchemaProvider().parseSchema(Resources.toString(Resources.getResource("protobuf/test_oneof.proto"), StandardCharsets.UTF_8), List.of(), true).get()).toDescriptor();
        DynamicMessage build = DynamicMessage.newBuilder(descriptor).setField(descriptor.findFieldByName("stringColumn"), "stringColumnValue1").build();
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(new ProducerRecord("topic-schema-with-oneof", createKeySchema(0L, getKeySchema()), build));
        this.testingKafka.sendMessages(builder.build().stream(), producerProperties());
        waitUntilTableExists("topic-schema-with-oneof");
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query(String.format("SELECT testOneOfColumn FROM %s", toDoubleQuoted("topic-schema-with-oneof"))))).matches("VALUES (JSON '{\"stringColumn\":\"%s\"}')\n".formatted("stringColumnValue1"));
    }

    @Test
    public void testAny() throws Exception {
        assertNotExists("topic-schema-with-any");
        Descriptors.Descriptor descriptor = getDescriptor("structural_datatypes.proto");
        DynamicMessage buildDynamicMessage = buildDynamicMessage(descriptor, ImmutableMap.builder().put("list", ImmutableList.of("Search")).put("map", ImmutableList.of(buildDynamicMessage(descriptor.findFieldByName("map").getMessageType(), ImmutableMap.of("key", "Key1", "value", "Value1")))).put("row", ImmutableMap.builder().put("string_column", "Trino").put("integer_column", 1).put("long_column", 493857959588286460L).put("double_column", Double.valueOf(3.141592653589793d)).put("float_column", Float.valueOf(3.14f)).put("boolean_column", true).put("number_column", descriptor.findEnumTypeByName("Number").findValueByName("ONE")).put("timestamp_column", getTimestamp(DateTimeTestingUtils.sqlTimestampOf(3, LocalDateTime.parse("2020-12-12T15:35:45.923")))).put("bytes_column", "Trino".getBytes(StandardCharsets.UTF_8)).buildOrThrow()).buildOrThrow());
        ProtobufSchema protobufSchema = (ProtobufSchema) new ProtobufSchemaProvider().parseSchema(Resources.toString(Resources.getResource("protobuf/test_any.proto"), StandardCharsets.UTF_8), List.of(), true).get();
        URI uri = new File(Resources.getResource("protobuf/any/structural_datatypes/schema").getFile()).getParentFile().toURI();
        Descriptors.Descriptor descriptor2 = protobufSchema.toDescriptor();
        DynamicMessage build = DynamicMessage.newBuilder(descriptor2).setField(descriptor2.findFieldByName("id"), 1).setField(descriptor2.findFieldByName("anyMessage"), Any.pack(buildDynamicMessage, uri.toString())).build();
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(new ProducerRecord("topic-schema-with-any", createKeySchema(0L, getKeySchema()), build));
        this.testingKafka.sendMessages(builder.build().stream(), producerProperties());
        waitUntilTableExists("topic-schema-with-any");
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query(String.format("SELECT id, anyMessage FROM %s", toDoubleQuoted("topic-schema-with-any"))))).matches("VALUES (1, JSON '{\"@type\":\"%s\",\"list\":[\"Search\"],\"map\":{\"Key1\":\"Value1\"},\"row\":{\"booleanColumn\":true,\"bytesColumn\":\"VHJpbm8=\",\"doubleColumn\":3.141592653589793,\"floatColumn\":3.14,\"integerColumn\":1,\"longColumn\":\"493857959588286460\",\"numberColumn\":\"ONE\",\"stringColumn\":\"Trino\",\"timestampColumn\":\"2020-12-12T15:35:45.923Z\"}}')\n".formatted(new File(Resources.getResource("protobuf/any/structural_datatypes/schema").getFile()).toURI()));
    }

    private DynamicMessage buildDynamicMessage(Descriptors.Descriptor descriptor, Map<String, Object> map) {
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(descriptor);
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            Descriptors.FieldDescriptor findFieldByName = descriptor.findFieldByName(entry.getKey());
            if (entry.getValue() instanceof Map) {
                newBuilder.setField(findFieldByName, buildDynamicMessage(findFieldByName.getMessageType(), (Map) entry.getValue()));
            } else {
                newBuilder.setField(findFieldByName, entry.getValue());
            }
        }
        return newBuilder.build();
    }

    protected static Timestamp getTimestamp(SqlTimestamp sqlTimestamp) {
        return Timestamp.newBuilder().setSeconds(Math.floorDiv(sqlTimestamp.getEpochMicros(), 1000000)).setNanos(StrictMath.floorMod(sqlTimestamp.getEpochMicros(), 1000000) * 1000).build();
    }

    private Map<String, String> producerProperties() {
        return ImmutableMap.of("schema.registry.url", this.testingKafka.getSchemaRegistryConnectString(), "key.serializer", KafkaProtobufSerializer.class.getName(), "value.serializer", KafkaProtobufSerializer.class.getName());
    }

    private void assertTopic(String str, String str2, String str3, boolean z, Map<String, String> map) throws Exception {
        this.testingKafka.createTopic(str);
        assertNotExists(str);
        List<ProducerRecord<DynamicMessage, DynamicMessage>> createMessages = createMessages(str, MESSAGE_COUNT, true, getInitialSchema(), getKeySchema());
        this.testingKafka.sendMessages(createMessages.stream(), map);
        waitUntilTableExists(str);
        assertCount(str, MESSAGE_COUNT);
        assertQuery(str2, getExpectedValues(createMessages, getInitialSchema(), z));
        List<ProducerRecord<DynamicMessage, DynamicMessage>> createMessages2 = createMessages(str, MESSAGE_COUNT, false, getEvolvedSchema(), getKeySchema());
        this.testingKafka.sendMessages(createMessages2.stream(), map);
        ImmutableList build = ImmutableList.builder().addAll(createMessages).addAll(createMessages2).build();
        assertCount(str, build.size());
        assertQuery(str3, getExpectedValues(build, getEvolvedSchema(), z));
    }

    private static String getExpectedValues(List<ProducerRecord<DynamicMessage, DynamicMessage>> list, Descriptors.Descriptor descriptor, boolean z) {
        StringBuilder sb = new StringBuilder("VALUES ");
        ImmutableList.Builder builder = ImmutableList.builder();
        for (ProducerRecord<DynamicMessage, DynamicMessage> producerRecord : list) {
            ImmutableList.Builder builder2 = ImmutableList.builder();
            if (z) {
                addExpectedColumns(((DynamicMessage) producerRecord.key()).getDescriptorForType(), (DynamicMessage) producerRecord.key(), builder2);
            }
            addExpectedColumns(descriptor, (DynamicMessage) producerRecord.value(), builder2);
            builder.add(String.format("(%s)", String.join(", ", (Iterable<? extends CharSequence>) builder2.build())));
        }
        sb.append(String.join(", ", (Iterable<? extends CharSequence>) builder.build()));
        return sb.toString();
    }

    private static void addExpectedColumns(Descriptors.Descriptor descriptor, DynamicMessage dynamicMessage, ImmutableList.Builder<String> builder) {
        for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) {
            if (dynamicMessage.getDescriptorForType().findFieldByName(fieldDescriptor.getName()) == null) {
                builder.add("null");
            } else {
                Object field = dynamicMessage.getField(dynamicMessage.getDescriptorForType().findFieldByName(fieldDescriptor.getName()));
                if (fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.STRING || fieldDescriptor.getJavaType() == Descriptors.FieldDescriptor.JavaType.ENUM) {
                    builder.add(toSingleQuoted(field));
                } else {
                    builder.add(String.valueOf(field));
                }
            }
        }
    }

    private void assertNotExists(String str) {
        if (schemaExists()) {
            assertQueryReturnsEmptyResult(String.format("SHOW TABLES LIKE '%s'", str));
        }
    }

    private void waitUntilTableExists(String str) {
        Failsafe.with(RetryPolicy.builder().withMaxAttempts(10).withDelay(Duration.ofMillis(100L)).build(), new RetryPolicy[0]).run(() -> {
            Assertions.assertThat(schemaExists()).isTrue();
        });
        Failsafe.with(RetryPolicy.builder().withMaxAttempts(10).withDelay(Duration.ofMillis(100L)).build(), new RetryPolicy[0]).run(() -> {
            Assertions.assertThat(tableExists(str)).isTrue();
        });
    }

    private boolean schemaExists() {
        return computeActual(String.format("SHOW SCHEMAS FROM %s LIKE '%s'", getSession().getCatalog().get(), getSession().getSchema().get())).getRowCount() == 1;
    }

    private boolean tableExists(String str) {
        return computeActual(String.format("SHOW TABLES LIKE '%s'", str.toLowerCase(Locale.ENGLISH))).getRowCount() == 1;
    }

    private void assertCount(String str, int i) {
        assertQuery(String.format("SELECT count(*) FROM %s", toDoubleQuoted(str)), String.format("VALUES (%s)", Integer.valueOf(i)));
    }

    private static Descriptors.Descriptor getInitialSchema() throws Exception {
        return getDescriptor("initial_schema.proto");
    }

    private static Descriptors.Descriptor getEvolvedSchema() throws Exception {
        return getDescriptor("evolved_schema.proto");
    }

    private static Descriptors.Descriptor getKeySchema() throws Exception {
        return getDescriptor("key_schema.proto");
    }

    public static Descriptors.Descriptor getDescriptor(String str) throws Exception {
        return ProtobufUtils.getFileDescriptor(ProtobufUtils.getProtoFile("protobuf/" + str)).findMessageTypeByName(RECORD_NAME);
    }

    private static String toDoubleQuoted(String str) {
        return String.format("\"%s\"", str);
    }

    private static String toSingleQuoted(Object obj) {
        Objects.requireNonNull(obj, "value is null");
        return String.format("'%s'", obj);
    }

    private static List<ProducerRecord<DynamicMessage, DynamicMessage>> createMessages(String str, int i, boolean z, Descriptors.Descriptor descriptor, Descriptors.Descriptor descriptor2) {
        ImmutableList.Builder builder = ImmutableList.builder();
        if (!z) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= i) {
                    break;
                }
                builder.add(new ProducerRecord(str, createKeySchema(j2, descriptor2), createRecordWithEvolvedSchema(j2, descriptor)));
                j = j2 + 1;
            }
        } else {
            long j3 = 0;
            while (true) {
                long j4 = j3;
                if (j4 >= i) {
                    break;
                }
                builder.add(new ProducerRecord(str, createKeySchema(j4, descriptor2), createRecordWithInitialSchema(j4, descriptor)));
                j3 = j4 + 1;
            }
        }
        return builder.build();
    }

    private static DynamicMessage createKeySchema(long j, Descriptors.Descriptor descriptor) {
        return DynamicMessage.newBuilder(descriptor).setField(descriptor.findFieldByName("key"), Long.valueOf(j)).build();
    }

    private static DynamicMessage createRecordWithInitialSchema(long j, Descriptors.Descriptor descriptor) {
        return DynamicMessage.newBuilder(descriptor).setField(descriptor.findFieldByName("col_1"), String.format("string-%s", Long.valueOf(j))).setField(descriptor.findFieldByName("col_2"), Long.valueOf(Math.multiplyExact(j, MESSAGE_COUNT))).build();
    }

    private static DynamicMessage createRecordWithEvolvedSchema(long j, Descriptors.Descriptor descriptor) {
        return DynamicMessage.newBuilder(descriptor).setField(descriptor.findFieldByName("col_1"), String.format("string-%s", Long.valueOf(j))).setField(descriptor.findFieldByName("col_2"), Long.valueOf(Math.multiplyExact(j, MESSAGE_COUNT))).setField(descriptor.findFieldByName("col_3"), Double.valueOf((j + 10.1d) / 10.0d)).build();
    }
}
