/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.kafka;

import io.debezium.annotation.ThreadSafe;
import io.debezium.util.IoUtil;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.log.Log;
import kafka.server.KafkaConfig;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;

@ThreadSafe
public class KafkaServer {
    public static final int DEFAULT_BROKER_ID = 1;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServer.class);
    private final Supplier<String> zkConnection;
    private final int brokerId;
    private volatile File logsDir;
    private final Properties config;
    private volatile int desiredPort = -1;
    private volatile int port = -1;
    private volatile kafka.server.KafkaServer server;

    public KafkaServer(Supplier<String> zookeeperConnection) {
        this(zookeeperConnection, 1);
    }

    public KafkaServer(Supplier<String> zookeeperConnection, int brokerId) {
        this(zookeeperConnection, brokerId, -1);
    }

    public KafkaServer(Supplier<String> zookeeperConnection, int brokerId, int port) {
        if (zookeeperConnection == null) {
            throw new IllegalArgumentException("The Zookeeper connection string supplier may not be null");
        }
        this.zkConnection = zookeeperConnection;
        this.brokerId = brokerId;
        this.config = new Properties();
        this.setPort(port);
        this.populateDefaultConfiguration(this.config);
    }

    protected int brokerId() {
        return this.brokerId;
    }

    protected String zookeeperConnection() {
        return this.zkConnection.get();
    }

    protected void populateDefaultConfiguration(Properties props) {
        this.config.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(1));
        this.config.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(Long.MAX_VALUE));
    }

    public KafkaServer setProperty(String name, String value) {
        if (this.server != null) {
            throw new IllegalStateException("Unable to change the properties when already running");
        }
        if (!(KafkaConfig.ZkConnectProp().equalsIgnoreCase(name) || KafkaConfig.BrokerIdProp().equalsIgnoreCase(name) || KafkaConfig.HostNameProp().equalsIgnoreCase(name))) {
            this.config.setProperty(name, value);
        }
        return this;
    }

    public KafkaServer setProperties(Properties properties) {
        if (this.server != null) {
            throw new IllegalStateException("Unable to change the properties when already running");
        }
        properties.stringPropertyNames().forEach(propName -> this.setProperty((String)propName, properties.getProperty((String)propName)));
        return this;
    }

    public KafkaServer setPort(int port) {
        this.port = this.desiredPort = port > 0 ? port : -1;
        return this;
    }

    public Properties config() {
        Properties runningConfig = new Properties();
        runningConfig.putAll((Map<?, ?>)this.config);
        runningConfig.setProperty(KafkaConfig.ZkConnectProp(), this.zookeeperConnection());
        runningConfig.setProperty(KafkaConfig.BrokerIdProp(), Integer.toString(this.brokerId));
        runningConfig.setProperty(KafkaConfig.HostNameProp(), "localhost");
        runningConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), String.valueOf(this.config.getOrDefault((Object)KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE)));
        runningConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp(), Integer.toString(1));
        runningConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp(), Integer.toString(0));
        return runningConfig;
    }

    public String getConnection() {
        return "localhost:" + this.port;
    }

    public synchronized KafkaServer startup() {
        if (this.server != null) {
            throw new IllegalStateException("" + this + " is already running");
        }
        Properties config = this.config();
        if (this.logsDir == null) {
            try {
                File temp = File.createTempFile("kafka", "suffix");
                this.logsDir = temp.getParentFile();
                temp.delete();
            }
            catch (IOException e) {
                throw new RuntimeException("Unable to create temporary directory", e);
            }
        }
        config.setProperty(KafkaConfig.LogDirProp(), this.logsDir.getAbsolutePath());
        config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
        this.port = this.desiredPort > 0 ? this.desiredPort : IoUtil.getAvailablePort();
        config.setProperty(KafkaConfig.PortProp(), Integer.toString(this.port));
        try {
            LOGGER.debug("Starting Kafka broker {} at {} with storage in {}", new Object[]{this.brokerId, this.getConnection(), this.logsDir.getAbsolutePath()});
            this.server = new kafka.server.KafkaServer(new KafkaConfig((Map)config), (Time)new SystemTime(), Option.apply(null), (Seq)new ArraySeq(0));
            this.server.startup();
            LOGGER.info("Started Kafka server {} at {} with storage in {}", new Object[]{this.brokerId, this.getConnection(), this.logsDir.getAbsolutePath()});
            return this;
        }
        catch (RuntimeException e) {
            this.server = null;
            throw e;
        }
    }

    public synchronized void shutdown(boolean deleteLogs) {
        if (this.server != null) {
            try {
                this.server.shutdown();
                if (deleteLogs) {
                    ((java.lang.Iterable)JavaConverters.asJavaIterableConverter((Iterable)this.server.logManager().allLogs()).asJava()).forEach(Log::delete);
                }
                LOGGER.info("Stopped Kafka server {} at {}", (Object)this.brokerId, (Object)this.getConnection());
            }
            finally {
                this.server = null;
                this.port = this.desiredPort;
            }
        }
    }

    public synchronized void deleteData() {
        if (this.server == null) {
            try {
                IoUtil.delete((File)this.logsDir);
            }
            catch (IOException e) {
                LOGGER.error("Unable to delete directory '{}'", (Object)this.logsDir, (Object)e);
            }
        }
    }

    public ZkUtils getZkUtils() {
        return this.server != null ? this.server.zkUtils() : null;
    }

    public void createTopics(String ... topics) {
        this.createTopics(1, 1, topics);
    }

    public void createTopics(int numPartitions, int replicationFactor, String ... topics) {
        for (String topic : topics) {
            if (topic == null) continue;
            this.createTopic(topic, numPartitions, replicationFactor);
        }
    }

    public void createTopic(String topic, int numPartitions, int replicationFactor) {
        RackAwareMode rackAwareMode = null;
        AdminUtils.createTopic((ZkUtils)this.getZkUtils(), (String)topic, (int)numPartitions, (int)replicationFactor, (Properties)new Properties(), rackAwareMode);
    }

    void onEachDirectory(Consumer<File> consumer) {
        consumer.accept(this.getStateDirectory());
    }

    public File getStateDirectory() {
        return this.logsDir;
    }

    public void setStateDirectory(File stateDirectory) {
        if (stateDirectory != null && stateDirectory.exists() && !stateDirectory.isDirectory() && !stateDirectory.canWrite() && !stateDirectory.canRead()) {
            throw new IllegalArgumentException("The directory must be readable and writable");
        }
        this.logsDir = stateDirectory;
    }

    public String toString() {
        return "KafkaServer{" + this.getConnection() + "}";
    }

    protected static class SystemTime
    implements Time {
        protected SystemTime() {
        }

        public long milliseconds() {
            return System.currentTimeMillis();
        }

        public long nanoseconds() {
            return System.nanoTime();
        }

        public long hiResClockMs() {
            return this.nanoseconds();
        }

        public void sleep(long ms) {
            try {
                Thread.sleep(ms);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

