package io.trino.tests.product.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import io.airlift.units.Duration;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.context.ThreadLocalTestContextHolder;
import io.trino.tempto.fulfillment.table.TableHandle;
import io.trino.tempto.fulfillment.table.TableManager;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessage;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableManager;
import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tests.product.TestGroups;
import io.trino.tests.product.utils.QueryAssertions;
import io.trino.tests.product.utils.SchemaRegistryClientUtils;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest.class */
public class TestKafkaAvroReadsSmokeTest extends ProductTest {
    private static final String KAFKA_SCHEMA = "product_tests";
    private static final String ALL_DATATYPES_AVRO_TOPIC_NAME = "read_all_datatypes_avro";
    private static final String ALL_DATATYPE_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/all_datatypes_avro_schema.avsc";
    private static final String ALL_NULL_AVRO_TOPIC_NAME = "read_all_null_avro";
    private static final String STRUCTURAL_AVRO_TOPIC_NAME = "read_structural_datatype_avro";
    private static final String STRUCTURAL_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/structural_datatype_avro_schema.avsc";
    private static final String AVRO_SCHEMA_WITH_REFERENCES_TOPIC_NAME = "schema_with_references_avro";
    private static final String AVRO_SCHEMA_WITH_REFERENCES_SCHEMA_PATH = "/docker/presto-product-tests/conf/presto/etc/catalog/kafka/schema_with_references.avsc";

    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest$AvroMessageSerializer.class */
    private static final class AvroMessageSerializer implements MessageSerializer {
        private AvroMessageSerializer() {
        }

        @Override // io.trino.tests.product.kafka.TestKafkaAvroReadsSmokeTest.MessageSerializer
        public byte[] serialize(String str, ParsedSchema parsedSchema, Map<String, Object> map) throws IOException {
            Schema schema = (Schema) parsedSchema.rawSchema();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            GenericData.Record record = new GenericData.Record(schema);
            Objects.requireNonNull(record);
            map.forEach(record::put);
            DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter(schema));
            try {
                dataFileWriter.create(schema, byteArrayOutputStream);
                dataFileWriter.append(record);
                dataFileWriter.close();
                return byteArrayOutputStream.toByteArray();
            } catch (Throwable th) {
                try {
                    dataFileWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest$KafkaCatalog.class */
    public static final class KafkaCatalog {
        private final String catalogName;
        private final String topicNameSuffix;
        private final boolean columnMappingSupported;

        private KafkaCatalog(String str, String str2, boolean z) {
            this.catalogName = (String) Objects.requireNonNull(str, "catalogName is null");
            this.topicNameSuffix = (String) Objects.requireNonNull(str2, "topicNameSuffix is null");
            this.columnMappingSupported = z;
        }

        public String getCatalogName() {
            return this.catalogName;
        }

        public String getTopicNameSuffix() {
            return this.topicNameSuffix;
        }

        public boolean isColumnMappingSupported() {
            return this.columnMappingSupported;
        }

        public String toString() {
            return this.catalogName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest$MessageSerializer.class */
    public interface MessageSerializer {
        byte[] serialize(String str, ParsedSchema parsedSchema, Map<String, Object> map) throws IOException;
    }

    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest$SchemaRegistryAvroMessageSerializer.class */
    private static final class SchemaRegistryAvroMessageSerializer implements MessageSerializer {
        private SchemaRegistryAvroMessageSerializer() {
        }

        @Override // io.trino.tests.product.kafka.TestKafkaAvroReadsSmokeTest.MessageSerializer
        public byte[] serialize(String str, ParsedSchema parsedSchema, Map<String, Object> map) throws IOException {
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    Schema schema = (Schema) parsedSchema.rawSchema();
                    byteArrayOutputStream.write(0);
                    byteArrayOutputStream.write(Ints.toByteArray(SchemaRegistryClientUtils.getSchemaRegistryClient().register(str + "-value", parsedSchema)));
                    BinaryEncoder directBinaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
                    GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
                    directBinaryEncoder.flush();
                    GenericData.Record record = new GenericData.Record(schema);
                    Objects.requireNonNull(record);
                    map.forEach(record::put);
                    genericDatumWriter.write(record, directBinaryEncoder);
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    byteArrayOutputStream.close();
                    return byteArray;
                } finally {
                }
            } catch (RestClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/tests/product/kafka/TestKafkaAvroReadsSmokeTest$TestingAvroSchema.class */
    public static class TestingAvroSchema implements ParsedSchema {
        private final String avroSchemaString;
        private final Schema avroSchema;
        private final List<SchemaReference> schemaReferences;

        public TestingAvroSchema(String str, List<SchemaReference> list, List<String> list2) {
            this.avroSchemaString = (String) Objects.requireNonNull(str, "avroSchemaString is null");
            this.schemaReferences = ImmutableList.copyOf((Collection) Objects.requireNonNull(list, "schemaReferences is null"));
            Objects.requireNonNull(list2, "resolvedReferences is null");
            Schema.Parser parser = new Schema.Parser();
            Objects.requireNonNull(parser);
            list2.forEach(parser::parse);
            this.avroSchema = parser.parse(str);
        }

        public String schemaType() {
            return "AVRO";
        }

        public String name() {
            return this.avroSchema.getName();
        }

        public String canonicalString() {
            return this.avroSchemaString;
        }

        public List<SchemaReference> references() {
            return this.schemaReferences;
        }

        public boolean isBackwardCompatible(ParsedSchema parsedSchema) {
            return false;
        }

        public Object rawSchema() {
            return this.avroSchema;
        }
    }

    @Test(groups = {TestGroups.KAFKA, TestGroups.PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs")
    public void testSelectPrimitiveDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) throws Exception {
        ImmutableMap of = ImmutableMap.of("a_varchar", "foobar", "a_bigint", 127L, "a_double", Double.valueOf(234.567d), "a_boolean", true);
        String str = "read_all_datatypes_avro" + kafkaCatalog.getTopicNameSuffix();
        createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TOPIC_NAME, str, (Map<String, Object>) of, messageSerializer);
        QueryAssertions.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            QueryAssert.assertThat(QueryExecutor.query(String.format("select * from %s.%s", kafkaCatalog.getCatalogName(), "product_tests." + str), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"foobar", 127, Double.valueOf(234.567d), true})});
        });
    }

    @Test(groups = {TestGroups.KAFKA, TestGroups.PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs")
    public void testNullType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) throws Exception {
        String str = "read_all_null_avro" + kafkaCatalog.getTopicNameSuffix();
        createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TOPIC_NAME, str, (Map<String, Object>) ImmutableMap.of(), messageSerializer);
        QueryAssertions.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            QueryAssert.assertThat(QueryExecutor.query(String.format("select * from %s.%s", kafkaCatalog.getCatalogName(), "product_tests." + str), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{null, null, null, null})});
        });
    }

    @Test(groups = {TestGroups.KAFKA, TestGroups.PROFILE_SPECIFIC_TESTS}, dataProvider = "catalogs")
    public void testSelectStructuralDataType(KafkaCatalog kafkaCatalog, MessageSerializer messageSerializer) throws Exception {
        ImmutableMap of = ImmutableMap.of("a_array", ImmutableList.of(100L, 102L), "a_map", ImmutableMap.of("key1", "value1"));
        String str = "read_structural_datatype_avro" + kafkaCatalog.getTopicNameSuffix();
        createAvroTable(STRUCTURAL_SCHEMA_PATH, STRUCTURAL_AVRO_TOPIC_NAME, str, (Map<String, Object>) of, messageSerializer);
        QueryAssertions.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            Object[] objArr = new Object[4];
            objArr[0] = kafkaCatalog.isColumnMappingSupported() ? "c_array" : "a_array";
            objArr[1] = kafkaCatalog.isColumnMappingSupported() ? "c_map" : "a_map";
            objArr[2] = kafkaCatalog.getCatalogName();
            objArr[3] = "product_tests." + str;
            QueryAssert.assertThat(QueryExecutor.query(String.format("SELECT a[1], a[2], m['key1'] FROM (SELECT %s as a, %s as m FROM %s.%s) t", objArr), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{100, 102, "value1"})});
        });
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    private static Object[][] catalogs() {
        return new Object[]{new Object[]{new KafkaCatalog(TestGroups.KAFKA, "", true), new AvroMessageSerializer()}, new Object[]{new KafkaCatalog("kafka_schema_registry", "_schema_registry", false), new SchemaRegistryAvroMessageSerializer()}};
    }

    @Test(groups = {TestGroups.KAFKA, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testAvroWithSchemaReferences() throws Exception {
        TestingAvroSchema testingAvroSchema = new TestingAvroSchema(Files.readString(new File(ALL_DATATYPE_SCHEMA_PATH).toPath()), ImmutableList.of(), ImmutableList.of());
        SchemaRegistryClientUtils.getSchemaRegistryClient().register("read_all_datatypes_avro-value", testingAvroSchema);
        ImmutableMap of = ImmutableMap.of("a_varchar", "foobar", "a_bigint", 127L, "a_double", Double.valueOf(234.567d), "a_boolean", true);
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder((Schema) testingAvroSchema.rawSchema());
        Objects.requireNonNull(genericRecordBuilder);
        of.forEach(genericRecordBuilder::set);
        createAvroTable(new TestingAvroSchema(Files.readString(new File(AVRO_SCHEMA_WITH_REFERENCES_SCHEMA_PATH).toPath()), ImmutableList.of(new SchemaReference(testingAvroSchema.name(), "read_all_datatypes_avro-value", 1)), ImmutableList.of(testingAvroSchema.canonicalString())), ALL_DATATYPES_AVRO_TOPIC_NAME, AVRO_SCHEMA_WITH_REFERENCES_TOPIC_NAME, (Map<String, Object>) ImmutableMap.of("reference", genericRecordBuilder.build()), new SchemaRegistryAvroMessageSerializer());
        QueryAssertions.assertEventually(new Duration(30.0d, TimeUnit.SECONDS), () -> {
            QueryAssert.assertThat(QueryExecutor.query(String.format("select reference.a_varchar, reference.a_double from kafka_schema_registry.%s.%s", KAFKA_SCHEMA, AVRO_SCHEMA_WITH_REFERENCES_TOPIC_NAME), new QueryExecutor.QueryParam[0])).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"foobar", Double.valueOf(234.567d)})});
        });
    }

    private static void createAvroTable(String str, String str2, String str3, Map<String, Object> map, MessageSerializer messageSerializer) throws Exception {
        createAvroTable(new TestingAvroSchema(Files.readString(new File(str).toPath()), ImmutableList.of(), ImmutableList.of()), str2, str3, map, messageSerializer);
    }

    private static void createAvroTable(TestingAvroSchema testingAvroSchema, String str, String str2, Map<String, Object> map, MessageSerializer messageSerializer) throws Exception {
        ((KafkaTableManager) ThreadLocalTestContextHolder.testContext().getDependency(TableManager.class, TestGroups.KAFKA)).createImmutable(new KafkaTableDefinition("product_tests." + str, str2, new ListKafkaDataSource(ImmutableList.of(new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendBytes(messageSerializer.serialize(str2, testingAvroSchema, map)).build()))), 1, 1), TableHandle.tableHandle(str).inSchema(KAFKA_SCHEMA));
    }
}
