package io.trino.plugin.kafka;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.inject.Scopes;
import io.airlift.configuration.ConditionalModule;
import io.airlift.json.JsonCodec;
import io.airlift.log.Level;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.airlift.testing.Closeables;
import io.airlift.units.Duration;
import io.trino.decoder.DecoderModule;
import io.trino.plugin.kafka.encoder.EncoderModule;
import io.trino.plugin.kafka.schema.ContentSchemaProvider;
import io.trino.plugin.kafka.schema.MapBasedTableDescriptionSupplier;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.plugin.kafka.schema.file.FileReadContentSchemaProvider;
import io.trino.plugin.kafka.util.CodecSupplier;
import io.trino.plugin.kafka.util.TestUtils;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TypeManager;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingSession;
import io.trino.testing.kafka.TestingKafka;
import io.trino.tpch.TpchTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/trino/plugin/kafka/KafkaQueryRunner.class */
public final class KafkaQueryRunner {
    private static final Logger log;
    private static final String TPCH_SCHEMA = "tpch";
    private static final String TEST = "test";

    /* loaded from: input_file:io/trino/plugin/kafka/KafkaQueryRunner$Builder.class */
    public static class Builder extends DistributedQueryRunner.Builder<Builder> {
        private final Map<String, String> connectorProperties;
        private List<TpchTable<?>> tables;
        private Map<SchemaTableName, KafkaTopicDescription> extraTopicDescription;
        private final boolean schemaRegistryEnabled;

        private Builder(String str, boolean z) {
            super(TestingSession.testSessionBuilder().setCatalog("kafka").setSchema(str).build());
            this.connectorProperties = new HashMap();
            this.tables = ImmutableList.of();
            this.extraTopicDescription = ImmutableMap.of();
            this.schemaRegistryEnabled = z;
        }

        @CanIgnoreReturnValue
        public Builder addConnectorProperties(Map<String, String> map) {
            this.connectorProperties.putAll(map);
            return this;
        }

        @CanIgnoreReturnValue
        public Builder setTables(Iterable<TpchTable<?>> iterable) {
            this.tables = ImmutableList.copyOf((Iterable) Objects.requireNonNull(iterable, "tables is null"));
            return this;
        }

        @CanIgnoreReturnValue
        public Builder setExtraTopicDescription(Map<SchemaTableName, KafkaTopicDescription> map) {
            this.extraTopicDescription = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "extraTopicDescription is null"));
            return this;
        }

        public DistributedQueryRunner build() throws Exception {
            AutoCloseable build = super.build();
            try {
                build.installPlugin(new TpchPlugin());
                build.createCatalog(KafkaQueryRunner.TPCH_SCHEMA, KafkaQueryRunner.TPCH_SCHEMA);
                ImmutableList.Builder builder = ImmutableList.builder();
                if (this.schemaRegistryEnabled) {
                    Preconditions.checkState(this.extraTopicDescription.isEmpty(), "unsupported extraTopicDescription with schema registry enabled");
                } else {
                    ImmutableMap.Builder putAll = ImmutableMap.builder().putAll(this.extraTopicDescription).putAll(KafkaQueryRunner.createTpchTopicDescriptions(build.getPlannerContext().getTypeManager(), this.tables));
                    ArrayList<SchemaTableName> arrayList = new ArrayList();
                    arrayList.add(new SchemaTableName("read_test", "all_datatypes_json"));
                    arrayList.add(new SchemaTableName("write_test", "all_datatypes_avro"));
                    arrayList.add(new SchemaTableName("write_test", "all_datatypes_csv"));
                    arrayList.add(new SchemaTableName("write_test", "all_datatypes_raw"));
                    arrayList.add(new SchemaTableName("write_test", "all_datatypes_json"));
                    JsonCodec jsonCodec = new CodecSupplier(KafkaTopicDescription.class, build.getPlannerContext().getTypeManager()).get();
                    for (SchemaTableName schemaTableName : arrayList) {
                        putAll.put(schemaTableName, KafkaQueryRunner.createTable(schemaTableName, jsonCodec));
                    }
                    builder.add(ConditionalModule.conditionalModule(KafkaConfig.class, kafkaConfig -> {
                        return kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(KafkaQueryRunner.TEST);
                    }, binder -> {
                        binder.bind(TableDescriptionSupplier.class).toInstance(new MapBasedTableDescriptionSupplier(putAll.buildOrThrow()));
                    })).add(binder2 -> {
                        binder2.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON);
                    }).add(new DecoderModule()).add(new EncoderModule());
                }
                build.installPlugin(new KafkaPlugin(builder.build()));
                build.createCatalog("kafka", "kafka", this.connectorProperties);
                if (this.schemaRegistryEnabled) {
                    Preconditions.checkState(this.tables.isEmpty(), "unsupported tables with schema registry enabled");
                } else {
                    KafkaQueryRunner.populateTables(build, this.tables);
                }
                return build;
            } catch (Throwable th) {
                Closeables.closeAllSuppress(th, new AutoCloseable[]{build});
                throw th;
            }
        }
    }

    /* loaded from: input_file:io/trino/plugin/kafka/KafkaQueryRunner$ConfluentSchemaRegistryQueryRunnerMain.class */
    public static final class ConfluentSchemaRegistryQueryRunnerMain {
        private ConfluentSchemaRegistryQueryRunnerMain() {
        }

        public static void main(String[] strArr) throws Exception {
            Logging.initialize();
            TestingKafka createWithSchemaRegistry = TestingKafka.createWithSchemaRegistry();
            createWithSchemaRegistry.start();
            DistributedQueryRunner build = ((Builder) KafkaQueryRunner.builderForConfluentSchemaRegistry(createWithSchemaRegistry).setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))).build();
            Logger logger = Logger.get(KafkaQueryRunner.class);
            logger.info("======== SERVER STARTED ========");
            logger.info("\n====\n%s\n====", new Object[]{build.getCoordinator().getBaseUrl()});
        }
    }

    /* loaded from: input_file:io/trino/plugin/kafka/KafkaQueryRunner$DefaultKafkaQueryRunnerMain.class */
    public static final class DefaultKafkaQueryRunnerMain {
        private DefaultKafkaQueryRunnerMain() {
        }

        public static void main(String[] strArr) throws Exception {
            Logging.initialize();
            TestingKafka create = TestingKafka.create();
            create.start();
            DistributedQueryRunner build = ((Builder) KafkaQueryRunner.builder(create).setTables(TpchTable.getTables()).setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))).build();
            Logger logger = Logger.get(KafkaQueryRunner.class);
            logger.info("======== SERVER STARTED ========");
            logger.info("\n====\n%s\n====", new Object[]{build.getCoordinator().getBaseUrl()});
        }
    }

    private KafkaQueryRunner() {
    }

    public static Builder builder(TestingKafka testingKafka) {
        return new Builder(TPCH_SCHEMA, false).addConnectorProperties(Map.of("kafka.nodes", testingKafka.getConnectString(), "kafka.messages-per-split", "1000", "kafka.table-description-supplier", TEST));
    }

    public static Builder builderForConfluentSchemaRegistry(TestingKafka testingKafka) {
        return new Builder("default", true).addConnectorProperties(Map.of("kafka.nodes", testingKafka.getConnectString(), "kafka.messages-per-split", "1000", "kafka.table-description-supplier", "confluent", "kafka.confluent-schema-registry-url", testingKafka.getSchemaRegistryConnectString(), "kafka.protobuf-any-support-enabled", "true"));
    }

    private static void populateTables(QueryRunner queryRunner, List<TpchTable<?>> list) {
        log.info("Loading data...");
        long nanoTime = System.nanoTime();
        for (TpchTable<?> tpchTable : list) {
            long nanoTime2 = System.nanoTime();
            log.info("Running import for %s", new Object[]{tpchTable.getTableName()});
            queryRunner.execute(String.format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", tpchTable.getTableName()));
            log.info("Imported %s in %s", new Object[]{tpchTable.getTableName(), Duration.nanosSince(nanoTime2).convertToMostSuccinctTimeUnit()});
        }
        log.info("Loading complete in %s", new Object[]{Duration.nanosSince(nanoTime).toString(TimeUnit.SECONDS)});
    }

    private static KafkaTopicDescription createTable(SchemaTableName schemaTableName, JsonCodec<KafkaTopicDescription> jsonCodec) throws IOException {
        KafkaTopicDescription kafkaTopicDescription = (KafkaTopicDescription) jsonCodec.fromJson(ByteStreams.toByteArray(KafkaQueryRunner.class.getResourceAsStream(String.format("/%s/%s.json", schemaTableName.getSchemaName(), schemaTableName.getTableName()))));
        return new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.toString(), kafkaTopicDescription.key().map(kafkaTopicFieldGroup -> {
            return new KafkaTopicFieldGroup(kafkaTopicFieldGroup.dataFormat(), kafkaTopicFieldGroup.dataSchema().map(str -> {
                return KafkaQueryRunner.class.getResource(str).getPath();
            }), Optional.empty(), kafkaTopicFieldGroup.fields());
        }), kafkaTopicDescription.message().map(kafkaTopicFieldGroup2 -> {
            return new KafkaTopicFieldGroup(kafkaTopicFieldGroup2.dataFormat(), kafkaTopicFieldGroup2.dataSchema().map(str -> {
                return KafkaQueryRunner.class.getResource(str).getPath();
            }), Optional.empty(), kafkaTopicFieldGroup2.fields());
        }));
    }

    private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescriptions(TypeManager typeManager, Iterable<TpchTable<?>> iterable) throws Exception {
        JsonCodec jsonCodec = new CodecSupplier(KafkaTopicDescription.class, typeManager).get();
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<TpchTable<?>> it = iterable.iterator();
        while (it.hasNext()) {
            SchemaTableName schemaTableName = new SchemaTableName(TPCH_SCHEMA, it.next().getTableName());
            builder.put(TestUtils.loadTpchTopicDescription(jsonCodec, schemaTableName.toString(), schemaTableName));
        }
        return builder.buildOrThrow();
    }

    static {
        Logging.initialize().setLevel("org.apache.kafka", Level.OFF);
        log = Logger.get(KafkaQueryRunner.class);
    }
}
