package io.trino.tests.product.launcher.env.common;

import com.google.inject.Inject;
import io.trino.tests.product.launcher.docker.ContainerUtil;
import io.trino.tests.product.launcher.docker.DockerFiles;
import io.trino.tests.product.launcher.env.DockerContainer;
import io.trino.tests.product.launcher.env.Environment;
import io.trino.tests.product.launcher.env.EnvironmentContainers;
import io.trino.tests.product.launcher.testcontainers.PortBinder;
import java.time.Duration;
import java.util.Objects;
import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:io/trino/tests/product/launcher/env/common/Kafka.class */
public class Kafka implements EnvironmentExtender {
    private static final String CONFLUENT_VERSION = "7.3.1";
    private static final int SCHEMA_REGISTRY_PORT = 8081;
    static final String KAFKA = "kafka";
    static final String SCHEMA_REGISTRY = "schema-registry";
    static final String ZOOKEEPER = "zookeeper";
    private final DockerFiles.ResourceProvider configDir;
    private final PortBinder portBinder;

    @Inject
    public Kafka(DockerFiles dockerFiles, PortBinder portBinder) {
        this.configDir = ((DockerFiles) Objects.requireNonNull(dockerFiles, "dockerFiles is null")).getDockerFilesHostDirectory("common/kafka");
        this.portBinder = (PortBinder) Objects.requireNonNull(portBinder, "portBinder is null");
    }

    @Override // io.trino.tests.product.launcher.env.common.EnvironmentExtender
    public void extendEnvironment(Environment.Builder builder) {
        builder.addContainers(createZookeeper(), createKafka(), createSchemaRegistry()).containerDependsOn(KAFKA, ZOOKEEPER).containerDependsOn(SCHEMA_REGISTRY, KAFKA);
        builder.configureContainers(dockerContainer -> {
            if (EnvironmentContainers.isTrinoContainer(dockerContainer.getLogicalName())) {
                dockerContainer.withCopyFileToContainer(MountableFile.forHostPath(this.configDir.getPath("log.properties")), "/docker/presto-product-tests/conf/presto/etc/log.properties");
            }
        });
        builder.configureContainer(KAFKA, dockerContainer2 -> {
            dockerContainer2.withCopyFileToContainer(MountableFile.forHostPath(this.configDir.getPath("log4j-kafka.properties.template")), "/etc/confluent/docker/log4j.properties.template");
        });
        builder.configureContainer(SCHEMA_REGISTRY, dockerContainer3 -> {
            dockerContainer3.withCopyFileToContainer(MountableFile.forHostPath(this.configDir.getPath("log4j-schema-registry.properties.template")), "/etc/confluent/docker/log4j.properties.template");
        });
    }

    private DockerContainer createZookeeper() {
        DockerContainer withStartupTimeout = new DockerContainer("confluentinc/cp-zookeeper:7.3.1", ZOOKEEPER).withEnv("ZOOKEEPER_CLIENT_PORT", "2181").withEnv("ZOOKEEPER_TICK_TIME", "2000").withStartupCheckStrategy(new IsRunningStartupCheckStrategy()).waitingFor(ContainerUtil.forSelectedPorts(2181)).withStartupTimeout(Duration.ofMinutes(5L));
        this.portBinder.exposePort(withStartupTimeout, 2181);
        return withStartupTimeout;
    }

    private DockerContainer createKafka() {
        DockerContainer withStartupTimeout = new DockerContainer("confluentinc/cp-kafka:7.3.1", KAFKA).withEnv("KAFKA_BROKER_ID", "1").withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181").withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://kafka:9092").withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1").withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0").withStartupCheckStrategy(new IsRunningStartupCheckStrategy()).waitingForAll(ContainerUtil.forSelectedPorts(9092), Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)).withStartupTimeout(Duration.ofMinutes(5L));
        this.portBinder.exposePort(withStartupTimeout, 9092);
        return withStartupTimeout;
    }

    private DockerContainer createSchemaRegistry() {
        DockerContainer withStartupTimeout = new DockerContainer("confluentinc/cp-schema-registry:7.3.1", SCHEMA_REGISTRY).withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092").withEnv("SCHEMA_REGISTRY_HOST_NAME", "0.0.0.0").withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081").withStartupCheckStrategy(new IsRunningStartupCheckStrategy()).waitingFor(ContainerUtil.forSelectedPorts(SCHEMA_REGISTRY_PORT)).withStartupTimeout(Duration.ofMinutes(5L));
        this.portBinder.exposePort(withStartupTimeout, SCHEMA_REGISTRY_PORT);
        return withStartupTimeout;
    }
}
