/*
 * Decompiled with CFR 0.152.
 */
package net.manub.embeddedkafka;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.admin.AdminUtils$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.utils.ZkUtils;
import kafka.utils.ZkUtils$;
import net.manub.embeddedkafka.EmbeddedKafkaConfig;
import net.manub.embeddedkafka.EmbeddedKafkaSupport;
import net.manub.embeddedkafka.KafkaUnavailableException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext$;
import scala.reflect.io.Directory;
import scala.reflect.io.Directory$;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

public abstract class EmbeddedKafkaSupport$class {
    public static Object withRunningKafka(EmbeddedKafkaSupport $this, Function0 body, EmbeddedKafkaConfig config) {
        ServerCnxnFactory factory = $this.startZooKeeper(config.zooKeeperPort(), $this.startZooKeeper$default$2());
        KafkaServer broker = $this.startKafka(config, $this.startKafka$default$2());
        try {
            return body.apply();
        }
        finally {
            broker.shutdown();
            factory.shutdown();
        }
    }

    public static void publishStringMessageToKafka(EmbeddedKafkaSupport $this, String topic, String message, EmbeddedKafkaConfig config) {
        $this.publishToKafka(topic, message, config, new StringSerializer());
    }

    public static void publishToKafka(EmbeddedKafkaSupport $this, String topic, Object message, EmbeddedKafkaConfig config, Serializer serializer) throws KafkaUnavailableException {
        KafkaProducer kafkaProducer = new KafkaProducer(JavaConversions$.MODULE$.mapAsJavaMap((Map)EmbeddedKafkaSupport$class.net$manub$embeddedkafka$EmbeddedKafkaSupport$$baseProducerConfig($this, config)), (Serializer)new StringSerializer(), serializer);
        Future sendFuture = kafkaProducer.send(new ProducerRecord(topic, message));
        Try sendResult = Try$.MODULE$.apply((Function0)new Serializable($this, sendFuture){
            public static final long serialVersionUID = 0L;
            private final Future sendFuture$1;

            public final RecordMetadata apply() {
                return (RecordMetadata)this.sendFuture$1.get(5L, TimeUnit.SECONDS);
            }
            {
                this.sendFuture$1 = sendFuture$1;
            }
        });
        kafkaProducer.close();
        if (sendResult.isFailure()) {
            throw new KafkaUnavailableException((Throwable)sendResult.failed().get());
        }
    }

    public static scala.collection.immutable.Map net$manub$embeddedkafka$EmbeddedKafkaSupport$$baseProducerConfig(EmbeddedKafkaSupport $this, EmbeddedKafkaConfig config) {
        return (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.kafkaPort())}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"max.block.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)5000)).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"retry.backoff.ms"), (Object)((Object)BoxesRunTime.boxToInteger((int)1000)).toString())}));
    }

    public static String consumeFirstStringMessageFrom(EmbeddedKafkaSupport $this, String topic, EmbeddedKafkaConfig config) {
        return (String)$this.consumeFirstMessageFrom(topic, config, new StringDeserializer());
    }

    public static Object consumeFirstMessageFrom(EmbeddedKafkaSupport $this, String topic, EmbeddedKafkaConfig config, Deserializer deserializer) throws TimeoutException, KafkaUnavailableException {
        Properties props = new Properties();
        props.put("group.id", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"embedded-kafka-spec"})).s((Seq)Nil$.MODULE$));
        props.put("bootstrap.servers", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.kafkaPort())})));
        props.put("auto.offset.reset", "earliest");
        KafkaConsumer consumer = new KafkaConsumer(props, (Deserializer)new StringDeserializer(), deserializer);
        Try message = Try$.MODULE$.apply((Function0)new Serializable($this, consumer, topic){
            public static final long serialVersionUID = 0L;
            private final KafkaConsumer consumer$1;
            private final String topic$1;

            public final T apply() {
                this.consumer$1.subscribe((Collection)JavaConversions$.MODULE$.seqAsJavaList((Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.topic$1}))));
                this.consumer$1.partitionsFor(this.topic$1);
                ConsumerRecords records = this.consumer$1.poll(5000L);
                if (records.isEmpty()) {
                    throw new TimeoutException("Unable to retrieve a message from Kafka in 5000ms");
                }
                return (T)((ConsumerRecord)records.iterator().next()).value();
            }
            {
                this.consumer$1 = consumer$1;
                this.topic$1 = topic$1;
            }
        });
        consumer.close();
        return message.recover((PartialFunction)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                A1 A1 = x1;
                if (A1 instanceof KafkaException) {
                    KafkaException kafkaException = (KafkaException)A1;
                    throw new KafkaUnavailableException((Throwable)kafkaException);
                }
                Object object = function1.apply(x1);
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x1) {
                Throwable throwable = x1;
                boolean bl = throwable instanceof KafkaException;
                return bl;
            }
        }).get();
    }

    public static ServerCnxnFactory startZooKeeper(EmbeddedKafkaSupport $this, int zooKeeperPort, Directory zkLogsDir) {
        int tickTime = 2000;
        ZooKeeperServer zkServer = new ZooKeeperServer(zkLogsDir.toFile().jfile(), zkLogsDir.toFile().jfile(), tickTime);
        ServerCnxnFactory factory = ServerCnxnFactory.createFactory();
        factory.configure(new InetSocketAddress("0.0.0.0", zooKeeperPort), 1024);
        factory.startup(zkServer);
        return factory;
    }

    public static Directory startZooKeeper$default$2(EmbeddedKafkaSupport $this) {
        return Directory$.MODULE$.makeTemp("zookeeper-logs", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
    }

    public static KafkaServer startKafka(EmbeddedKafkaSupport $this, EmbeddedKafkaConfig config, Directory kafkaLogDir) {
        String zkAddress = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())}));
        Properties properties = new Properties();
        config.customBrokerProperties().foreach((Function1)new Serializable($this, properties){
            public static final long serialVersionUID = 0L;
            private final Properties properties$1;

            public final Object apply(Tuple2<String, String> x0$1) {
                Tuple2<String, String> tuple2 = x0$1;
                if (tuple2 != null) {
                    String key = (String)tuple2._1();
                    String value = (String)tuple2._2();
                    Object object = this.properties$1.setProperty(key, value);
                    return object;
                }
                throw new MatchError(tuple2);
            }
            {
                this.properties$1 = properties$1;
            }
        });
        properties.setProperty("zookeeper.connect", zkAddress);
        properties.setProperty("broker.id", "0");
        properties.setProperty("host.name", "localhost");
        properties.setProperty("advertised.host.name", "localhost");
        properties.setProperty("auto.create.topics.enable", "true");
        properties.setProperty("port", ((Object)BoxesRunTime.boxToInteger((int)config.kafkaPort())).toString());
        properties.setProperty("log.dir", kafkaLogDir.toAbsolute().path());
        properties.setProperty("log.flush.interval.messages", ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        KafkaServer broker = new KafkaServer(new KafkaConfig((java.util.Map)properties), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3());
        broker.startup();
        return broker;
    }

    public static Directory startKafka$default$2(EmbeddedKafkaSupport $this) {
        return Directory$.MODULE$.makeTemp("kafka", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
    }

    public static void createCustomTopic(EmbeddedKafkaSupport $this, String topic, scala.collection.immutable.Map topicConfig, int partitions, int replicationFactor, EmbeddedKafkaConfig config) {
        ZkUtils zkUtils = ZkUtils$.MODULE$.apply(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"localhost:", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)config.zooKeeperPort())})), $this.zkSessionTimeoutMs(), $this.zkConnectionTimeoutMs(), $this.zkSecurityEnabled());
        Properties topicProperties = (Properties)topicConfig.foldLeft((Object)new Properties(), (Function2)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final Properties apply(Properties x0$2, Tuple2<String, String> x1$1) {
                Tuple2 tuple2 = new Tuple2((Object)x0$2, x1$1);
                if (tuple2 != null) {
                    Properties props = (Properties)tuple2._1();
                    Tuple2 tuple22 = (Tuple2)tuple2._2();
                    if (tuple22 != null) {
                        String k = (String)tuple22._1();
                        String v = (String)tuple22._2();
                        props.put(k, v);
                        Properties properties = props;
                        return properties;
                    }
                }
                throw new MatchError((Object)tuple2);
            }
        });
        try {
            AdminUtils$.MODULE$.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties, AdminUtils$.MODULE$.createTopic$default$6());
            return;
        }
        finally {
            zkUtils.close();
        }
    }

    public static scala.collection.immutable.Map createCustomTopic$default$2(EmbeddedKafkaSupport $this) {
        return Predef$.MODULE$.Map().empty();
    }

    public static int createCustomTopic$default$3(EmbeddedKafkaSupport $this) {
        return 1;
    }

    public static int createCustomTopic$default$4(EmbeddedKafkaSupport $this) {
        return 1;
    }

    public static void $init$(EmbeddedKafkaSupport $this) {
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$executorService_$eq(Executors.newFixedThreadPool(2));
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$executionContext_$eq(ExecutionContext$.MODULE$.fromExecutorService($this.executorService()));
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkSessionTimeoutMs_$eq(10000);
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkConnectionTimeoutMs_$eq(10000);
        $this.net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkSecurityEnabled_$eq(false);
    }
}

