package io.axual.platform.test.core;

import io.axual.common.annotation.InterfaceStability;
import io.axual.common.config.PasswordConfig;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.http.cookie.ClientCookie;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.CustomBooleanEditor;
import scala.Option;

@InterfaceStability.Evolving
/* loaded from: input_file:BOOT-INF/lib/axual-platform-test-core-6.0.0.jar:io/axual/platform/test/core/KafkaUnit.class */
public class KafkaUnit {
    private static final String LOCALHOST = "localhost";
    private static final String BIND_ALL = "0.0.0.0";
    private KafkaServer broker;
    private ZookeeperUnit zookeeper;
    private final String zookeeperString;
    private final String brokerString;
    private final String bindAddress;
    private final String advertisedAddress;
    private final String zkBindAddress;
    private int zkPort;
    private int brokerPort;
    private File logDir;
    private Map<String, Object> kafkaBrokerConfig;
    private int brokerId;
    private final AtomicBoolean isRunning;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaUnit.class);
    private static AtomicInteger brokerIdCounter = new AtomicInteger(0);

    public KafkaUnit() {
        this(null, null, getEphemeralPort(), getEphemeralPort());
    }

    public KafkaUnit(String str, String str2, int i, int i2) {
        this.kafkaBrokerConfig = new HashMap();
        this.isRunning = new AtomicBoolean(false);
        this.bindAddress = str != null ? str : "0.0.0.0";
        this.advertisedAddress = str2 != null ? str2 : "localhost";
        this.zkBindAddress = getHostIPv4Address(this.bindAddress);
        this.zkPort = i;
        this.brokerPort = i2;
        this.zookeeperString = this.zkBindAddress + ":" + i;
        this.brokerString = String.format("%s:%d", this.advertisedAddress, Integer.valueOf(this.brokerPort));
        this.brokerId = brokerIdCounter.incrementAndGet();
    }

    private static String getHostIPv4Address(String str) {
        if ("0.0.0.0".equals(str)) {
            str = "localhost";
        }
        try {
            return InetAddress.getByName(str).getHostAddress();
        } catch (UnknownHostException e) {
            throw new IllegalStateException("Cannot find '" + str + "'", e);
        }
    }

    private static synchronized int getEphemeralPort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                int localPort = serverSocket.getLocalPort();
                if (serverSocket != null) {
                    if (0 != 0) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serverSocket.close();
                    }
                }
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Could not allocate free port for KafkaUnit");
        }
    }

    public void startup() {
        this.zookeeper = new ZookeeperUnit(this.zkPort);
        this.zookeeper.startup();
        LOG.info("Starting Kafka on port {}", Integer.valueOf(this.brokerPort));
        try {
            this.logDir = Files.createTempDirectory("kafka", new FileAttribute[0]).toFile();
            this.kafkaBrokerConfig.put("zookeeper.connect", this.zookeeperString);
            this.kafkaBrokerConfig.put(TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID, Integer.toString(this.brokerId));
            this.kafkaBrokerConfig.put("host.name", this.bindAddress);
            this.kafkaBrokerConfig.put(ClientCookie.PORT_ATTR, "" + this.brokerPort);
            this.kafkaBrokerConfig.put(TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR, this.logDir.getAbsolutePath());
            this.kafkaBrokerConfig.put("log.flush.interval.messages", String.valueOf(1));
            this.kafkaBrokerConfig.put("offsets.topic.replication.factor", CustomBooleanEditor.VALUE_1);
            this.kafkaBrokerConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, CustomBooleanEditor.VALUE_1);
            this.kafkaBrokerConfig.put("transaction.state.log.min.isr", CustomBooleanEditor.VALUE_1);
            this.kafkaBrokerConfig.put("transaction.state.log.replication.factor", CustomBooleanEditor.VALUE_1);
            this.kafkaBrokerConfig.put("auto.create.topics.enable", "false");
            this.broker = new KafkaServer(new KafkaConfig(this.kafkaBrokerConfig), Time.SYSTEM, Option.apply("axual-"), false);
            this.broker.startup();
            this.isRunning.set(true);
        } catch (IOException e) {
            throw new RuntimeException("Unable to start Kafka", e);
        }
    }

    public String getBootstrapServer() {
        return this.brokerString;
    }

    public int getZkPort() {
        return this.zkPort;
    }

    public int getBrokerPort() {
        return this.brokerPort;
    }

    public void createTopic(String str) {
        createTopic(str, 1);
    }

    private static void copy(Map<String, Object> map, Map<String, Object> map2, String str, Object obj) {
        map2.put(str, map.getOrDefault(str, obj));
    }

    public void createTopic(String str, Integer num) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.kafkaBrokerConfig.get("advertised.listeners"));
        copy(this.kafkaBrokerConfig, hashMap, "security.protocol", "SSL");
        copy(this.kafkaBrokerConfig, hashMap, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        copy(this.kafkaBrokerConfig, hashMap, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, null);
        copy(this.kafkaBrokerConfig, hashMap, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, null);
        copy(this.kafkaBrokerConfig, hashMap, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, null);
        copy(this.kafkaBrokerConfig, hashMap, SslConfigs.SSL_KEY_PASSWORD_CONFIG, null);
        copy(this.kafkaBrokerConfig, hashMap, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, null);
        copy(this.kafkaBrokerConfig, hashMap, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, null);
        AdminClient create = AdminClient.create((Map<String, Object>) hashMap);
        Throwable th = null;
        try {
            NewTopic newTopic = new NewTopic(str, num.intValue(), (short) 1);
            LOG.info("Executing: CreateTopic {} partitions {}", str, num);
            try {
                create.createTopics(Collections.singleton(newTopic)).all().get();
                LOG.info("CreateTopic {} partitions {} done", str, num);
            } catch (InterruptedException e) {
                LOG.warn("CreateTopic {} partitions {} failed", str, num, e);
                Thread.currentThread().interrupt();
            } catch (ExecutionException e2) {
                LOG.warn("CreateTopic {} partitions {} failed", str, num, e2);
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    public void shutdown() {
        if (this.broker != null) {
            LOG.info("Shutting down Kafka on port {}", Integer.valueOf(this.brokerPort));
            this.broker.shutdown();
            this.broker.awaitShutdown();
            try {
                FileUtils.cleanDirectory(this.logDir);
            } catch (IOException e) {
                LOG.warn("Could not clean the directory {}", this.logDir.getAbsolutePath());
            }
            try {
                Files.delete(this.logDir.toPath());
            } catch (IOException e2) {
                LOG.error("Error deleting the directory {}", this.logDir.getAbsolutePath());
            }
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
        this.isRunning.set(false);
    }

    public final void setKafkaBrokerConfig(String str, Object obj) {
        if (obj instanceof PasswordConfig) {
            obj = new Password(((PasswordConfig) obj).getValue());
        }
        this.kafkaBrokerConfig.put(str, obj);
    }

    public boolean isRunning() {
        return this.zookeeper.isRunning() && this.isRunning.get();
    }
}
