/*
 * Decompiled with CFR 0.152.
 */
package io.virtualan.message.core;

import io.virtualan.core.model.VirtualServiceRequest;
import io.virtualan.core.util.ReturnMockResponse;
import io.virtualan.message.core.MessageObject;
import io.virtualan.message.core.MessageUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnResource;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.transformer.GenericTransformer;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@ConditionalOnClass(value={IntegrationFlows.class})
@EnableIntegration
@EnableKafka
@ConditionalOnResource(resources={"classpath:conf/kafka.json"})
@Service(value="messagingApplication")
public class MessagingApplication {
    private static final Logger log = LoggerFactory.getLogger(MessagingApplication.class);
    private static List<NewTopic> topicList = new ArrayList<NewTopic>();
    private String bootstrapServers;
    private List<String> topics = new ArrayList<String>();
    private Map<String, Object> producerConfig = new HashMap<String, Object>();
    private Map<String, Object> consumerConfigs = new HashMap<String, Object>();
    @Autowired
    private MessageUtil messageUtil;

    private static JSONArray getJsonObject() throws IOException {
        InputStream stream = MessagingApplication.class.getClassLoader().getResourceAsStream("conf/kafka.json");
        if (stream != null) {
            String jmsConfigJson = MessagingApplication.readString(stream);
            JSONObject jsonObject = new JSONObject(jmsConfigJson);
            return jsonObject.optJSONArray("Kafka");
        }
        return null;
    }

    public static String readString(InputStream inputStream) throws IOException {
        try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));){
            String string = br.lines().collect(Collectors.joining(System.lineSeparator()));
            return string;
        }
    }

    private static NewTopic addNewTopic(String topic) {
        HashMap configs = new HashMap();
        int partitions = 5;
        Short replication = 1;
        return new NewTopic(topic, partitions, replication.shortValue()).configs(configs);
    }

    @PostConstruct
    public void init() {
        try {
            JSONObject obj;
            JSONArray jsonArray = MessagingApplication.getJsonObject();
            if (jsonArray != null && (obj = jsonArray.optJSONObject(0)) != null) {
                this.bootstrapServers = obj.getString("broker");
                JSONArray array = obj.getJSONArray("topics");
                for (int i = 0; i < array.length(); ++i) {
                    this.topics.add(array.get(i).toString());
                }
                this.addTopics();
                this.bootstrapServers = obj.getString("broker");
                this.getConfigMap(obj, "consumer", this.consumerConfigs);
                this.getConfigMap(obj, "producer", this.producerConfig);
                if (topicList != null) {
                    this.addTopic(topicList);
                }
            }
        }
        catch (Exception e) {
            log.error("Unable to load the kafka configuration");
        }
    }

    public Map getConfigMap(JSONObject obj, String consumer, Map<String, Object> consumerConfigs) {
        Map<String, String> configProps = this.loadProperties(obj.optString(consumer));
        if (configProps != null) {
            consumerConfigs.putAll(configProps);
        }
        return configProps;
    }

    public void addTopics() throws ExecutionException, InterruptedException {
        Set<String> names = this.getTopics();
        for (String topic : this.topics) {
            boolean contains = names.contains(topic);
            if (contains) continue;
            topicList.add(MessagingApplication.addNewTopic(topic));
        }
    }

    private Map<String, String> loadProperties(String propFileName) {
        InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(propFileName);
        HashMap<String, String> mapList = new HashMap<String, String>();
        Properties properties = new Properties();
        if (inputStream != null) {
            try {
                properties.load(inputStream);
                properties.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> {
                    String sk = k.toString();
                    String sv = v.toString();
                    mapList.put(sk, sv);
                }));
                return mapList;
            }
            catch (IOException e) {
                log.warn("property file '{}' not found in the classpath.. loading default setting {}", (Object)propFileName, (Object)e.getMessage());
            }
        } else {
            log.warn("property file '{}' not found in the classpath.. loading default setting", (Object)propFileName);
        }
        return null;
    }

    private AdminClient getAdminClient() {
        Properties config = new Properties();
        config.put("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create((Properties)config);
    }

    public void addTopic(List<NewTopic> topicList) {
        AdminClient admin = this.getAdminClient();
        admin.createTopics(topicList);
    }

    public boolean isTopicExists(String topic) throws ExecutionException, InterruptedException {
        return this.getTopics().contains(topic);
    }

    private Set<String> getTopics() throws ExecutionException, InterruptedException {
        AdminClient admin = this.getAdminClient();
        ListTopicsResult listTopics = admin.listTopics();
        return (Set)listTopics.names().get();
    }

    @Bean
    private ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory(this.producerConfigs());
    }

    @Bean
    private Map<String, Object> producerConfigs() {
        this.producerConfig.put("bootstrap.servers", this.bootstrapServers);
        if (this.producerConfig.size() == 1) {
            this.producerConfig.put("key.serializer", StringSerializer.class);
            this.producerConfig.put("value.serializer", StringSerializer.class);
            this.producerConfig.put("linger.ms", 1);
            this.producerConfig.put("acks", "all");
            this.producerConfig.put("retries", 0);
        }
        return this.producerConfig;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        this.consumerConfigs.put("bootstrap.servers", this.bootstrapServers);
        if (this.consumerConfigs.size() == 1) {
            this.consumerConfigs.put("key.deserializer", StringDeserializer.class);
            this.consumerConfigs.put("value.deserializer", StringDeserializer.class);
            this.consumerConfigs.put("group.id", "virtualan-consumer-1");
            this.consumerConfigs.put("auto.offset.reset", "earliest");
            this.consumerConfigs.put("max.poll.records", 1);
            this.consumerConfigs.put("max.poll.interval.ms", 1000);
            this.consumerConfigs.put("enable.auto.commit", true);
            this.consumerConfigs.put("auto.commit.interval.ms", 1000);
        }
        return this.consumerConfigs;
    }

    @Bean
    private ConsumerFactory<?, ?> consumerFactory() {
        return new DefaultKafkaConsumerFactory(this.consumerConfigs());
    }

    @Bean(value={"sentToTransformer"})
    private DirectChannel sentToTransformer() {
        return new DirectChannel();
    }

    @Bean(value={"listenerFromKafkaFlow"})
    public IntegrationFlow listenerFromKafkaFlow() {
        return ((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlows.from((MessageProducerSpec)Kafka.messageDrivenChannelAdapter(this.consumerFactory(), (KafkaMessageDrivenChannelAdapter.ListenerMode)KafkaMessageDrivenChannelAdapter.ListenerMode.record, (String[])this.topics.toArray(new String[this.topics.size()])).configureListenerContainer(c -> c.ackMode(ContainerProperties.AckMode.RECORD).idleEventInterval(Long.valueOf(100L)).id("messageListenerContainer"))).channel((MessageChannel)this.sentToTransformer())).transform(this.transformer())).handle((Object)this.getResponseMessage())).handle((Object)this.postMessage())).get();
    }

    @Bean
    public GenericTransformer<Message<?>, MessageObject> transformer() {
        return new GenericTransformer<Message<?>, MessageObject>(){

            public MessageObject transform(Message<?> message) {
                return MessagingApplication.this.parse(message);
            }
        };
    }

    @Transformer
    public MessageObject parse(Message<?> message) {
        MessageObject messageObject = new MessageObject();
        try {
            messageObject.setJsonObject((JSONObject)new JSONTokener(message.getPayload().toString()).nextValue());
            messageObject.setInboundTopic(message.getHeaders().get((Object)"kafka_receivedTopic").toString());
            messageObject.setHeaders(message.getHeaders());
            return messageObject;
        }
        catch (JSONException e) {
            log.warn("parse {}", e.getCause());
            return messageObject;
        }
    }

    @Bean
    private KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate(this.producerFactory());
    }

    @Bean
    private ResponseMessage getResponseMessage() {
        return messageObject -> {
            if (messageObject.getJsonObject() != null) {
                VirtualServiceRequest virtualServiceRequest = new VirtualServiceRequest();
                virtualServiceRequest.setInput(messageObject.getJsonObject().toString());
                virtualServiceRequest.setOperationId(messageObject.getInboundTopic());
                virtualServiceRequest.setResource(messageObject.getInboundTopic());
                ReturnMockResponse response = this.messageUtil.getMatchingRecord(virtualServiceRequest);
                if (response != null && response.getMockResponse() != null) {
                    messageObject.setOutputMessage(response.getMockResponse().getOutput());
                    messageObject.setOutboundTopic(response.getMockRequest().getMethod());
                    if (messageObject.getOutputMessage() == null || messageObject.getOutboundTopic() == null) {
                        log.info("No outputMessage response configured..");
                        return null;
                    }
                    log.info("Response configured.. with ({}) : {}", (Object)messageObject.getOutboundTopic(), (Object)messageObject.getOutputMessage());
                    return messageObject;
                }
            }
            log.info("No response configured for the given input");
            return null;
        };
    }

    @Bean
    private SendMessage postMessage() {
        return messageObject -> {
            if (messageObject.getOutboundTopic() != null && messageObject.getHeaders().get((Object)"X-Virtualan-Header") == null) {
                Message message = MessageBuilder.withPayload((Object)messageObject.getOutputMessage()).setHeader("kafka_topic", (Object)messageObject.getOutboundTopic()).setHeader("kafka_messageKey", (Object)messageObject.getMessageKey()).setHeader("kafka_partitionId", (Object)0).setHeader("X-Virtualan-Header", (Object)"Mock-Service-Response").build();
                this.kafkaTemplate().send(message);
            }
            return null;
        };
    }

    static interface SendMessage {
        public String send(MessageObject var1) throws MqttException;
    }

    static interface ResponseMessage {
        public MessageObject readResponseMessage(MessageObject var1);
    }
}

