/*
 * Decompiled with CFR 0.152.
 */
package net.leadware.kafka.embedded;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import kafka.common.KafkaException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import net.leadware.kafka.embedded.model.ConsumerGroup;
import net.leadware.kafka.embedded.model.ConsumerGroupOffset;
import net.leadware.kafka.embedded.model.Topic;
import net.leadware.kafka.embedded.properties.BrokerProperties;
import net.leadware.kafka.embedded.properties.ListenerProperties;
import net.leadware.kafka.embedded.properties.ListenerProtocolProperties;
import net.leadware.kafka.embedded.properties.SimulatorProperties;
import net.leadware.kafka.embedded.tools.SimulatorUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.Assert;

public class KafkaSimulator {
    private String DEFAULT_HOSTS = "127.0.0.1";
    private static final int ZOOKEEPER_CONNEXION_TIMEOUT = 6000;
    private static final int ZOOKEEPER_SESSION_TIMEOUT = 6000;
    private static final int DEFAULT_ADMIN_TIMEOUT = 30;
    private final SimulatorProperties simulatorProperties;
    private Map<String, String> brokerProperties = null;
    private final List<KafkaServer> kafkaServers = new ArrayList<KafkaServer>();
    private EmbeddedZookeeper zookeeper;
    private String zookeeperConnexionUrl;
    private ZkClient zookeeperClient;
    private KafkaAdmin kafkaAdmin;
    private AdminClient adminClient;
    private KafkaTemplate<String, String> kafkaStringProducerTemplate;
    private File temporairyDir;
    private File truststoreLocation;
    private File keystoreLocation;
    private Set<String> createdTopics = new HashSet<String>();

    @PostConstruct
    public void initialize() {
        this.simulatorProperties.validate();
        this.initializeZookeeper();
        this.initializeBrokers();
        this.initializeTopics();
        this.initializeProducers();
    }

    private void initializeZookeeper() {
        System.setProperty("java.io.tmpdir", this.simulatorProperties.getJavaTemporaryDirectory());
        this.zookeeper = new EmbeddedZookeeper();
        this.zookeeperConnexionUrl = "127.0.0.1:".concat(String.valueOf(this.zookeeper.port()));
        this.zookeeperClient = new ZkClient(this.zookeeperConnexionUrl, 6000, 6000, (ZkSerializer)ZKStringSerializer$.MODULE$);
    }

    private void initializeBrokers() {
        this.kafkaServers.clear();
        if (this.simulatorProperties.getBrokerConfigs() == null || this.simulatorProperties.getBrokerConfigs().isEmpty()) {
            throw new RuntimeException("Veuillez renseigner la configuration d'au moins un Broker");
        }
        for (int index = 0; index < this.simulatorProperties.getBrokerConfigs().size(); ++index) {
            BrokerProperties brokerProperties = this.simulatorProperties.getBrokerConfigs().get(index);
            int publicPort = brokerProperties.getListener().getPort();
            int adminPort = this.findAdminPort(brokerProperties, publicPort);
            int internalProducerPort = this.findInternalProducerPort(brokerProperties, publicPort, adminPort);
            brokerProperties.getListener().setAdminPort(adminPort);
            brokerProperties.getListener().setInternalProducerPort(internalProducerPort);
            Properties properties = this.createBrokerProperties(index, brokerProperties);
            KafkaServer server = TestUtils.createServer((KafkaConfig)new KafkaConfig((Map)properties), (Time)Time.SYSTEM);
            this.kafkaServers.add(server);
        }
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getAdminBrokersUrls());
        adminConfigs.put("client.id", "simulator-admin-client");
        this.adminClient = AdminClient.create(adminConfigs);
    }

    private int findInternalProducerPort(BrokerProperties brokerProperties, int ... excluded) {
        int port = brokerProperties.getListener().getInternalProducerPort();
        if (port <= 0) {
            port = SimulatorUtils.findAvailablePortExcept(excluded);
        }
        return port;
    }

    private int findAdminPort(BrokerProperties brokerProperties, int ... excluded) {
        int port = brokerProperties.getListener().getAdminPort();
        if (port <= 0) {
            port = SimulatorUtils.findAvailablePortExcept(excluded);
        }
        return port;
    }

    private Properties createBrokerProperties(int brokerId, BrokerProperties brokerConfig) {
        Properties properties = new Properties();
        properties.setProperty(KafkaConfig.ZkConnectProp(), this.zookeeperConnexionUrl);
        properties.setProperty(KafkaConfig.BrokerIdGenerationEnableProp(), String.valueOf(true));
        properties.setProperty(KafkaConfig.BrokerIdProp(), String.valueOf(brokerId));
        properties.setProperty(KafkaConfig.ControlledShutdownEnableProp(), String.valueOf(this.simulatorProperties.getControlledShutdown()));
        properties.setProperty(KafkaConfig.NumNetworkThreadsProp(), String.valueOf(this.simulatorProperties.getNetworkThreadCount()));
        properties.setProperty(KafkaConfig.NumIoThreadsProp(), String.valueOf(this.simulatorProperties.getIoThreadCount()));
        properties.setProperty(KafkaConfig.PortProp(), String.valueOf(brokerConfig.getListener().getPort()));
        properties.setProperty(KafkaConfig.ListenersProp(), this.getListenerAllUrls(brokerConfig.getListener()));
        properties.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp(), brokerConfig.getListener().getProtocol().getScheme().getValue());
        properties.setProperty(KafkaConfig.SocketSendBufferBytesProp(), String.valueOf(this.simulatorProperties.getSendBufferSize()));
        properties.setProperty(KafkaConfig.SocketReceiveBufferBytesProp(), String.valueOf(this.simulatorProperties.getReceiveBufferSize()));
        properties.setProperty(KafkaConfig.SocketRequestMaxBytesProp(), String.valueOf(this.simulatorProperties.getMaxRequestSize()));
        properties.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(this.simulatorProperties.getPartitionCount()));
        properties.setProperty(KafkaConfig.LogDirsProp(), brokerConfig.getLogsDirectories().stream().map(logDirectory -> SimulatorUtils.getResolvedPath(logDirectory)).collect(Collectors.joining(",")));
        properties.setProperty(KafkaConfig.LogDirProp(), SimulatorUtils.getResolvedPath(brokerConfig.getLogsDirectory()));
        properties.setProperty(KafkaConfig.LogFlushIntervalMsProp(), String.valueOf(1000));
        properties.setProperty(KafkaConfig.LogFlushSchedulerIntervalMsProp(), String.valueOf(1000));
        properties.setProperty(KafkaConfig.LogRetentionTimeMinutesProp(), String.valueOf(30));
        properties.setProperty("log.file.size", String.valueOf(0x20000000));
        properties.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), String.valueOf(60000));
        properties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
        properties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
        properties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
        properties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
        properties.setProperty(KafkaConfig.SslEnabledProtocolsProp(), "TLSv1.2,TLSv1.1,TLSv1");
        if (this.simulatorProperties.getTruststoreConfig() != null) {
            properties.setProperty(KafkaConfig.SslTruststoreLocationProp(), SimulatorUtils.getResolvedPath(this.simulatorProperties.getTruststoreConfig().getLocation()));
            properties.setProperty(KafkaConfig.SslTruststorePasswordProp(), this.simulatorProperties.getTruststoreConfig().getPassword());
            properties.setProperty(KafkaConfig.SslTruststoreTypeProp(), this.simulatorProperties.getTruststoreConfig().getType().getValue());
            properties.setProperty(KafkaConfig.SslKeyManagerAlgorithmProp(), this.simulatorProperties.getTruststoreConfig().getKeymanagerAlgorithm().getValue());
        }
        if (this.simulatorProperties.getKeystoreConfig() != null) {
            properties.setProperty(KafkaConfig.SslKeystoreLocationProp(), SimulatorUtils.getResolvedPath(this.simulatorProperties.getKeystoreConfig().getLocation()));
            properties.setProperty(KafkaConfig.SslKeystorePasswordProp(), this.simulatorProperties.getKeystoreConfig().getPassword());
            properties.setProperty(KafkaConfig.SslKeyPasswordProp(), this.simulatorProperties.getKeystoreConfig().getKeyPassword());
            properties.setProperty(KafkaConfig.SslKeystoreTypeProp(), this.simulatorProperties.getKeystoreConfig().getType().getValue());
            properties.setProperty(KafkaConfig.SslKeyManagerAlgorithmProp(), this.simulatorProperties.getKeystoreConfig().getKeymanagerAlgorithm().getValue());
        }
        properties.setProperty(KafkaConfig.SslClientAuthProp(), this.simulatorProperties.getSslClientAuthentication().getValue().toLowerCase());
        properties.setProperty(KafkaConfig.SslProtocolProp(), this.simulatorProperties.getSslProtocol().getValue());
        properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), this.getListenerProtocolMap(this.simulatorProperties));
        return properties;
    }

    private void initializeTopics() {
        this.createdTopics.clear();
        this.internalCreateTopics(this.simulatorProperties.getInitialTopics());
    }

    private void initializeProducers() {
        HashMap<String, Object> producerProperties = new HashMap<String, Object>();
        producerProperties.put("bootstrap.servers", this.getInternalProducerBrokersUrls());
        producerProperties.put("client.id", "simulator-string-producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        DefaultKafkaProducerFactory kafkaProducerFactory = new DefaultKafkaProducerFactory(producerProperties);
        this.kafkaStringProducerTemplate = new KafkaTemplate((ProducerFactory)kafkaProducerFactory);
    }

    private void doWithAdmin(Consumer<AdminClient> callback) {
        try {
            callback.accept(this.adminClient);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private String getListenerProtocolMap(SimulatorProperties simulatorProperties) {
        return simulatorProperties.getBrokerConfigs().stream().map(brokerConfig -> {
            ListenerProtocolProperties protocole = brokerConfig.getListener().getProtocol();
            ListenerProtocolProperties adminProtocole = brokerConfig.getListener().getAdminProtocol();
            ListenerProtocolProperties producerProtocole = brokerConfig.getListener().getInternalProducerProtocol();
            StringBuilder mapBuilder = new StringBuilder();
            mapBuilder.append(protocole.getName() + ":" + protocole.getScheme().getValue());
            mapBuilder.append("," + adminProtocole.getName() + ":" + adminProtocole.getScheme().getValue());
            mapBuilder.append("," + producerProtocole.getName() + ":" + producerProtocole.getScheme().getValue());
            return mapBuilder.toString();
        }).collect(Collectors.joining(","));
    }

    private String getAdminBrokersUrls() {
        return this.simulatorProperties.getBrokerConfigs().parallelStream().map(brokerProperty -> this.getListenerAdminUrl(brokerProperty.getListener())).collect(Collectors.joining(",", "", ""));
    }

    public String getInternalProducerBrokersUrls() {
        return this.simulatorProperties.getBrokerConfigs().parallelStream().map(brokerProperty -> this.getListenerInternalProducerUrl(brokerProperty.getListener())).collect(Collectors.joining(",", "", ""));
    }

    private String getListenerAllUrls(ListenerProperties listener) {
        StringBuilder builder = new StringBuilder();
        builder.append(this.getListenerPublicUrl(listener));
        builder.append(",").append(this.getListenerAdminUrl(listener));
        builder.append(",").append(this.getListenerInternalProducerUrl(listener));
        return builder.toString();
    }

    private String getListenerPublicUrl(ListenerProperties listener) {
        StringBuilder builder = new StringBuilder();
        builder.append(listener.getProtocol().getName()).append("://").append(this.DEFAULT_HOSTS).append(":").append(listener.getPort());
        return builder.toString();
    }

    private String getListenerAdminUrl(ListenerProperties listener) {
        StringBuilder builder = new StringBuilder();
        builder.append(listener.getAdminProtocol().getName()).append("://").append(this.DEFAULT_HOSTS).append(":").append(listener.getAdminPort());
        return builder.toString();
    }

    private String getListenerInternalProducerUrl(ListenerProperties listener) {
        StringBuilder builder = new StringBuilder();
        builder.append(listener.getInternalProducerProtocol().getName()).append("://").append(this.DEFAULT_HOSTS).append(":").append(listener.getInternalProducerPort());
        return builder.toString();
    }

    private void internalCreateTopics(List<String> topicsNames) {
        this.doWithAdmin(adminClient -> this.createTopics((AdminClient)adminClient, topicsNames.stream().map(topicName -> new NewTopic(topicName, this.simulatorProperties.getPartitionCount().intValue(), (short)this.simulatorProperties.getBrokerConfigs().size())).collect(Collectors.toList())));
    }

    private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
        Assert.notNull((Object)this.zookeeper, (String)"Assurez-vous que le cluster ZooKeeper est actif avant toute op\u00e9ration.");
        for (NewTopic topic : newTopics) {
            Assert.isTrue((boolean)this.createdTopics.add(topic.name()), () -> "Ce topic existe d\u00e9j\u00e0 : " + topic.name());
        }
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        try {
            createTopics.all().get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new KafkaException((Throwable)e);
        }
    }

    public void createTopics(String ... topicsNames) {
        this.internalCreateTopics(Arrays.stream(topicsNames).collect(Collectors.toList()));
    }

    public List<Topic> listTopics() {
        return this.listTopics(false);
    }

    public List<Topic> listTopics(boolean internal) {
        Assert.notNull((Object)this.zookeeper, (String)"Assurez-vous que le cluster ZooKeeper est actif avant toute op\u00e9ration.");
        ListTopicsOptions options = new ListTopicsOptions().listInternal(internal);
        KafkaFuture topicListingFuture = this.adminClient.listTopics(options).listings();
        try {
            return ((Collection)topicListingFuture.get(30L, TimeUnit.SECONDS)).stream().map(topicListing -> new Topic(topicListing.name(), topicListing.isInternal())).collect(Collectors.toList());
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new KafkaException((Throwable)e);
        }
    }

    public void deleteTopics(List<String> topicsNames) {
        Assert.notNull((Object)this.zookeeper, (String)"Assurez-vous que le cluster ZooKeeper est actif avant toute op\u00e9ration.");
        for (String topicName : topicsNames) {
            Assert.isTrue((boolean)this.createdTopics.remove(topicName), () -> "Ce topic n'existe pas : " + topicName);
        }
        DeleteTopicsResult result = this.adminClient.deleteTopics(topicsNames);
        try {
            result.all().get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new KafkaException((Throwable)e);
        }
    }

    public void deleteTopics(String ... topicsNames) {
        this.deleteTopics(Arrays.stream(topicsNames).collect(Collectors.toList()));
    }

    public List<ConsumerGroup> listConsumerGroup() {
        Assert.notNull((Object)this.zookeeper, (String)"Assurez-vous que le cluster ZooKeeper est actif avant toute op\u00e9ration.");
        KafkaFuture consumerGroupsListingFuture = this.adminClient.listConsumerGroups().all();
        try {
            return ((Collection)consumerGroupsListingFuture.get(30L, TimeUnit.SECONDS)).stream().map(consumerGroupsListing -> new ConsumerGroup(consumerGroupsListing.groupId(), consumerGroupsListing.isSimpleConsumerGroup())).collect(Collectors.toList());
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new KafkaException((Throwable)e);
        }
    }

    public List<ConsumerGroupOffset> listConsumerGroupOffsets(String groupId) {
        Assert.notNull((Object)this.zookeeper, (String)"Assurez-vous que le cluster ZooKeeper est actif avant toute op\u00e9ration.");
        KafkaFuture consumerGroupsOffsetsMapFuture = this.adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata();
        try {
            return ((Map)consumerGroupsOffsetsMapFuture.get(30L, TimeUnit.SECONDS)).entrySet().stream().map(consumerGroupsOffsetEntry -> {
                TopicPartition topic = (TopicPartition)consumerGroupsOffsetEntry.getKey();
                OffsetAndMetadata offset = (OffsetAndMetadata)consumerGroupsOffsetEntry.getValue();
                return new ConsumerGroupOffset(topic.topic(), topic.partition(), offset.offset(), offset.metadata());
            }).collect(Collectors.toList());
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new KafkaException((Throwable)e);
        }
    }

    public void sendMessage(String topic, String key, String message) {
        this.kafkaStringProducerTemplate.send(topic, (Object)key, (Object)message);
    }

    public void sendMessage(String topic, String message) {
        this.kafkaStringProducerTemplate.send(topic, (Object)message);
    }

    @PreDestroy
    public void destroy() {
        try {
            this.adminClient.close(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        try {
            this.zookeeperClient.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.zookeeper.shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public KafkaSimulator(SimulatorProperties simulatorProperties) {
        this.simulatorProperties = simulatorProperties;
    }

    public String getDEFAULT_HOSTS() {
        return this.DEFAULT_HOSTS;
    }

    public SimulatorProperties getSimulatorProperties() {
        return this.simulatorProperties;
    }

    public Map<String, String> getBrokerProperties() {
        return this.brokerProperties;
    }

    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    public String getZookeeperConnexionUrl() {
        return this.zookeeperConnexionUrl;
    }

    public ZkClient getZookeeperClient() {
        return this.zookeeperClient;
    }

    public KafkaAdmin getKafkaAdmin() {
        return this.kafkaAdmin;
    }

    public AdminClient getAdminClient() {
        return this.adminClient;
    }

    public KafkaTemplate<String, String> getKafkaStringProducerTemplate() {
        return this.kafkaStringProducerTemplate;
    }

    public File getTemporairyDir() {
        return this.temporairyDir;
    }

    public File getTruststoreLocation() {
        return this.truststoreLocation;
    }

    public File getKeystoreLocation() {
        return this.keystoreLocation;
    }

    public Set<String> getCreatedTopics() {
        return this.createdTopics;
    }
}

