/*
 * Decompiled with CFR 0.152.
 */
package net.manub.embeddedkafka;

import java.net.InetSocketAddress;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import net.manub.embeddedkafka.EmbeddedKafka;
import net.manub.embeddedkafka.EmbeddedKafkaConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import scala.Function0;
import scala.Predef;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.reflect.io.Directory;
import scala.reflect.io.Directory$;
import scala.runtime.BoxesRunTime;

public abstract class EmbeddedKafka$class {
    public static void withRunningKafka(EmbeddedKafka $this, Function0 body, EmbeddedKafkaConfig config) {
        ServerCnxnFactory factory = EmbeddedKafka$class.startZooKeeper($this, config.zooKeeperPort());
        KafkaServer broker = EmbeddedKafka$class.startKafka($this, config);
        try {
            body.apply$mcV$sp();
            return;
        }
        finally {
            broker.shutdown();
            factory.shutdown();
        }
    }

    public static void publishToKafka(EmbeddedKafka $this, String topic, String message, EmbeddedKafkaConfig config) {
        scala.collection.immutable.Map producerProps = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.kafkaPort())}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"key.serializer"), (Object)StringSerializer.class.getName()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"value.serializer"), (Object)StringSerializer.class.getName())}));
        KafkaProducer kafkaProducer = new KafkaProducer(JavaConversions$.MODULE$.mapAsJavaMap((Map)producerProps));
        kafkaProducer.send(new ProducerRecord(topic, (Object)message));
        kafkaProducer.close();
    }

    private static ServerCnxnFactory startZooKeeper(EmbeddedKafka $this, int zooKeeperPort) {
        Directory zkLogsDir = Directory$.MODULE$.makeTemp("zookeeper-logs", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
        int tickTime = 2000;
        ZooKeeperServer zkServer = new ZooKeeperServer(zkLogsDir.toFile().jfile(), zkLogsDir.toFile().jfile(), tickTime);
        ServerCnxnFactory factory = ServerCnxnFactory.createFactory();
        factory.configure(new InetSocketAddress("localhost", zooKeeperPort), 1024);
        factory.startup(zkServer);
        return factory;
    }

    private static KafkaServer startKafka(EmbeddedKafka $this, EmbeddedKafkaConfig config) {
        Directory kafkaLogDir = Directory$.MODULE$.makeTemp("kafka", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
        String zkAddress = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())}));
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", zkAddress);
        properties.setProperty("broker.id", "0");
        properties.setProperty("host.name", "localhost");
        properties.setProperty("auto.create.topics.enable", "true");
        properties.setProperty("port", ((Object)BoxesRunTime.boxToInteger((int)config.kafkaPort())).toString());
        properties.setProperty("log.dir", kafkaLogDir.toAbsolute().path());
        properties.setProperty("log.flush.interval.messages", ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        KafkaServer broker = new KafkaServer(new KafkaConfig(properties), KafkaServer$.MODULE$.$lessinit$greater$default$2());
        broker.startup();
        return broker;
    }

    public static void $init$(EmbeddedKafka $this) {
    }
}

