package io.trino.tests.product.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import com.google.common.primitives.Ints;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Timestamp;
import io.airlift.units.Duration;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.context.ThreadLocalTestContextHolder;
import io.trino.tempto.fulfillment.table.TableHandle;
import io.trino.tempto.fulfillment.table.TableManager;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessage;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableManager;
import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tests.product.TestGroups;
import io.trino.tests.product.utils.QueryAssertions;
import io.trino.tests.product.utils.QueryExecutors;
import io.trino.tests.product.utils.SchemaRegistryClientUtils;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest.class */
public class TestKafkaProtobufReadsSmokeTest extends ProductTest {
    private static final String KAFKA_SCHEMA = "product_tests";
    private static final String BASIC_DATATYPES_PROTOBUF_TOPIC_NAME = "read_basic_datatypes_protobuf";
    private static final String BASIC_DATATYPES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/basic_datatypes.proto";
    private static final String BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME = "read_basic_structural_datatypes_protobuf";
    private static final String BASIC_STRUCTURAL_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/basic_structural_datatypes.proto";
    private static final String ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY = "all_datatypes_protobuf_schema_registry";
    private static final String ALL_DATATYPES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes.proto";
    private static final KafkaCatalog KAFKA_CATALOG = new KafkaCatalog(TestGroups.KAFKA, "", true, new ProtobufMessageSerializer());
    private static final KafkaCatalog KAFKA_SCHEMA_REGISTRY_CATALOG = new KafkaCatalog("kafka_schema_registry", "_schema_registry", false, new SchemaRegistryProtobufMessageSerializer());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog.class */
    public static final class KafkaCatalog extends Record {
        private final String catalogName;
        private final String topicNameSuffix;
        private final boolean columnMappingSupported;
        private final MessageSerializer messageSerializer;

        private KafkaCatalog(String str, String str2, boolean z, MessageSerializer messageSerializer) {
            this.catalogName = (String) Objects.requireNonNull(str, "catalogName is null");
            this.topicNameSuffix = (String) Objects.requireNonNull(str2, "topicNameSuffix is null");
            this.columnMappingSupported = z;
            this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "messageSerializer is null");
        }

        @Override // java.lang.Record
        public String toString() {
            return this.catalogName;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KafkaCatalog.class), KafkaCatalog.class, "catalogName;topicNameSuffix;columnMappingSupported;messageSerializer", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->catalogName:Ljava/lang/String;", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->topicNameSuffix:Ljava/lang/String;", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->columnMappingSupported:Z", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->messageSerializer:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$MessageSerializer;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KafkaCatalog.class, Object.class), KafkaCatalog.class, "catalogName;topicNameSuffix;columnMappingSupported;messageSerializer", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->catalogName:Ljava/lang/String;", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->topicNameSuffix:Ljava/lang/String;", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->columnMappingSupported:Z", "FIELD:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$KafkaCatalog;->messageSerializer:Lio/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$MessageSerializer;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String catalogName() {
            return this.catalogName;
        }

        public String topicNameSuffix() {
            return this.topicNameSuffix;
        }

        public boolean columnMappingSupported() {
            return this.columnMappingSupported;
        }

        public MessageSerializer messageSerializer() {
            return this.messageSerializer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$MessageSerializer.class */
    public interface MessageSerializer {
        byte[] serialize(String str, ProtobufSchema protobufSchema, Map<String, Object> map) throws Exception;
    }

    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$ProtobufMessageSerializer.class */
    private static final class ProtobufMessageSerializer implements MessageSerializer {
        private ProtobufMessageSerializer() {
        }

        @Override // io.trino.tests.product.kafka.TestKafkaProtobufReadsSmokeTest.MessageSerializer
        public byte[] serialize(String str, ProtobufSchema protobufSchema, Map<String, Object> map) {
            return TestKafkaProtobufReadsSmokeTest.buildDynamicMessage(protobufSchema.toDescriptor(), map).toByteArray();
        }
    }

    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaProtobufReadsSmokeTest$SchemaRegistryProtobufMessageSerializer.class */
    private static final class SchemaRegistryProtobufMessageSerializer implements MessageSerializer {
        private SchemaRegistryProtobufMessageSerializer() {
        }

        @Override // io.trino.tests.product.kafka.TestKafkaProtobufReadsSmokeTest.MessageSerializer
        public byte[] serialize(String str, ProtobufSchema protobufSchema, Map<String, Object> map) throws Exception {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                byteArrayOutputStream.write(0);
                byteArrayOutputStream.write(Ints.toByteArray(SchemaRegistryClientUtils.getSchemaRegistryClient().register(str + "-value", protobufSchema)));
                byteArrayOutputStream.write(0);
                byteArrayOutputStream.write(TestKafkaProtobufReadsSmokeTest.buildDynamicMessage(protobufSchema.toDescriptor(), map).toByteArray());
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                byteArrayOutputStream.close();
                return byteArray;
            } catch (Throwable th) {
                try {
                    byteArrayOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    @Test(groups = {TestGroups.KAFKA, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testSelectPrimitiveDataType() throws Exception {
        selectPrimitiveDataType(KAFKA_CATALOG);
    }

    @Test(groups = {TestGroups.KAFKA_CONFLUENT_LICENSE, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testSelectPrimitiveDataTypeWithSchemaRegistry() throws Exception {
        selectPrimitiveDataType(KAFKA_SCHEMA_REGISTRY_CATALOG);
    }

    private void selectPrimitiveDataType(KafkaCatalog kafkaCatalog) throws Exception {
        ImmutableMap buildOrThrow = ImmutableMap.builder().put("a_varchar", "foobar").put("b_integer", 314).put("c_bigint", Long.MAX_VALUE).put("d_double", Double.valueOf(1.2345678901234567E9d)).put("e_float", Float.valueOf(3.14f)).put("f_boolean", true).buildOrThrow();
        String str = "read_basic_datatypes_protobuf" + kafkaCatalog.topicNameSuffix();
        createProtobufTable(BASIC_DATATYPES_SCHEMA_PATH, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, str, (Map<String, Object>) buildOrThrow, kafkaCatalog.messageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onTrino().executeQuery(String.format("select * from %s.%s", kafkaCatalog.catalogName(), "product_tests." + str), new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"foobar", 314, Long.MAX_VALUE, Double.valueOf(1.2345678901234567E9d), Float.valueOf(3.14f), true})});
        });
    }

    @Test(groups = {TestGroups.KAFKA, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testSelectStructuralDataType() throws Exception {
        selectStructuralDataType(KAFKA_CATALOG);
    }

    @Test(groups = {TestGroups.KAFKA_CONFLUENT_LICENSE, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testSelectStructuralDataTypeWithSchemaRegistry() throws Exception {
        selectStructuralDataType(KAFKA_SCHEMA_REGISTRY_CATALOG);
    }

    private void selectStructuralDataType(KafkaCatalog kafkaCatalog) throws Exception {
        ImmutableMap of = ImmutableMap.of("a_array", ImmutableList.of(100L, 101L), "a_map", ImmutableMap.of("key", "key1", "value", Double.valueOf(1.2345678901234567E9d)));
        String str = "read_basic_structural_datatypes_protobuf" + kafkaCatalog.topicNameSuffix();
        createProtobufTable(BASIC_STRUCTURAL_SCHEMA_PATH, BASIC_STRUCTURAL_PROTOBUF_TOPIC_NAME, str, (Map<String, Object>) of, kafkaCatalog.messageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            QueryExecutor onTrino = QueryExecutors.onTrino();
            Object[] objArr = new Object[4];
            objArr[0] = kafkaCatalog.columnMappingSupported() ? "c_array" : "a_array";
            objArr[1] = kafkaCatalog.columnMappingSupported() ? "c_map" : "a_map";
            objArr[2] = kafkaCatalog.catalogName();
            objArr[3] = "product_tests." + str;
            ((QueryAssert) Assertions.assertThat(onTrino.executeQuery(String.format("SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", objArr), new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{100L, 101L, Double.valueOf(1.2345678901234567E9d)})});
        });
    }

    @Test(groups = {TestGroups.KAFKA_CONFLUENT_LICENSE, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testProtobufWithSchemaReferences() throws Exception {
        ProtobufSchema protobufSchema = new ProtobufSchema(Resources.toString(Resources.getResource(TestKafkaProtobufReadsSmokeTest.class, "/" + "google/protobuf/timestamp.proto"), StandardCharsets.UTF_8), ImmutableList.of(), ImmutableMap.of(), (Integer) null, "google/protobuf/timestamp.proto");
        SchemaRegistryClientUtils.getSchemaRegistryClient().register("timestamp", protobufSchema);
        ProtobufSchema protobufSchema2 = new ProtobufSchema(Files.readString(Path.of(ALL_DATATYPES_SCHEMA_PATH, new String[0])), ImmutableList.of(new SchemaReference(protobufSchema.name(), "timestamp", 1)), ImmutableMap.of("google/protobuf/timestamp.proto", protobufSchema.canonicalString()), (Integer) null, (String) null);
        LocalDateTime parse = LocalDateTime.parse("2020-12-12T15:35:45.923");
        createProtobufTable(protobufSchema2, BASIC_DATATYPES_PROTOBUF_TOPIC_NAME, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY, (Map<String, Object>) ImmutableMap.builder().put("a_varchar", "foobar").put("b_integer", 2).put("c_bigint", Long.MAX_VALUE).put("d_double", Double.valueOf(1.2345678901234567E9d)).put("e_float", Float.valueOf(3.14f)).put("f_boolean", true).put("h_timestamp", Timestamp.newBuilder().setSeconds(parse.toEpochSecond(ZoneOffset.UTC)).setNanos(parse.getNano()).build()).buildOrThrow(), new SchemaRegistryProtobufMessageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onTrino().executeQuery(String.format("select * from %s.%s.%s", KAFKA_SCHEMA_REGISTRY_CATALOG.catalogName(), KAFKA_SCHEMA, ALL_DATATYPES_PROTOBUF_TOPIC_SCHEMA_REGISTRY), new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"foobar", 2, Long.MAX_VALUE, Double.valueOf(1.2345678901234567E9d), Float.valueOf(3.14f), true, "ZERO", java.sql.Timestamp.valueOf(parse)})});
        });
    }

    private static void createProtobufTable(String str, String str2, String str3, Map<String, Object> map, MessageSerializer messageSerializer) throws Exception {
        createProtobufTable(new ProtobufSchema(Files.readString(Path.of(str, new String[0]))), str2, str3, map, messageSerializer);
    }

    private static void createProtobufTable(ProtobufSchema protobufSchema, String str, String str2, Map<String, Object> map, MessageSerializer messageSerializer) throws Exception {
        ((KafkaTableManager) ThreadLocalTestContextHolder.testContext().getDependency(TableManager.class, TestGroups.KAFKA)).createImmutable(new KafkaTableDefinition("product_tests." + str, str2, new ListKafkaDataSource(ImmutableList.of(new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendBytes(messageSerializer.serialize(str2, protobufSchema, map)).build()))), 1, 1), TableHandle.tableHandle(str).inSchema(KAFKA_SCHEMA));
    }

    private static DynamicMessage buildDynamicMessage(Descriptors.Descriptor descriptor, Map<String, Object> map) {
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(descriptor);
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            Descriptors.FieldDescriptor findFieldByName = descriptor.findFieldByName(entry.getKey());
            if (entry.getValue() instanceof Map) {
                newBuilder.setField(findFieldByName, ImmutableList.of(buildDynamicMessage(findFieldByName.getMessageType(), (Map) entry.getValue())));
            } else {
                newBuilder.setField(findFieldByName, entry.getValue());
            }
        }
        return newBuilder.build();
    }
}
