/*
 * Decompiled with CFR 0.152.
 */
package io.virtualan.message.core;

import io.virtualan.core.model.VirtualServiceRequest;
import io.virtualan.core.util.ReturnMockResponse;
import io.virtualan.message.core.MessageObject;
import io.virtualan.message.core.MessageUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnResource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.transformer.GenericTransformer;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@Configuration
@ConditionalOnClass(value={IntegrationFlows.class})
@EnableIntegration
@EnableKafka
@ConditionalOnResource(resources={"classpath:conf/kafka.json"})
public class MessagingApplication {
    private static final Logger log = LoggerFactory.getLogger(MessagingApplication.class);
    public static List<NewTopic> topicList = new ArrayList<NewTopic>();
    @Autowired
    private MessageUtil messageUtil;
    private static String bootstrapServers;
    private static List<String> topics;
    Map<String, Object> producerConfig = new HashMap<String, Object>();
    Map<String, Object> consumerConfigs = new HashMap<String, Object>();

    @PostConstruct
    public void init() {
        try {
            JSONObject obj = MessagingApplication.getJsonObject().optJSONObject(0);
            if (obj != null) {
                bootstrapServers = obj.getString("broker");
                JSONArray array = obj.getJSONArray("topics");
                for (int i = 0; i < array.length(); ++i) {
                    topics.add(array.get(i).toString());
                }
                Set<String> names = MessagingApplication.getTopics();
                for (String topic : topics) {
                    boolean contains = names.contains(topic);
                    if (contains) continue;
                    topicList.add(MessagingApplication.addNewTopic(topic));
                }
                bootstrapServers = obj.getString("broker");
                Map<String, String> configProps = this.loadProperties(obj.optString("consumer"));
                if (configProps != null) {
                    this.consumerConfigs.putAll(configProps);
                }
                Map<String, String> configProducerProps = this.loadProperties(obj.optString("producer"));
                if (configProps != null) {
                    this.producerConfig.putAll(configProducerProps);
                }
                if (topicList != null) {
                    MessagingApplication.addTopic(topicList);
                }
            }
        }
        catch (Exception e) {
            log.error("Unable to load the kafka configuration");
        }
    }

    private Map<String, String> loadProperties(String propFileName) {
        InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(propFileName);
        HashMap<String, String> mapList = new HashMap<String, String>();
        Properties properties = new Properties();
        if (inputStream != null) {
            try {
                properties.load(inputStream);
                if (properties != null) {
                    properties.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> {
                        String sk = k.toString();
                        String sv = v.toString();
                        mapList.put(sk, sv);
                    }));
                    return mapList;
                }
            }
            catch (IOException e) {
                log.warn("property file '" + propFileName + "' not found in the classpath.. loading default setting " + e.getMessage());
            }
        } else {
            log.warn("property file '" + propFileName + "' not found in the classpath.. loading default setting");
        }
        return null;
    }

    private static JSONArray getJsonObject() throws Exception {
        InputStream stream = MessagingApplication.class.getClassLoader().getResourceAsStream("conf/kafka.json");
        if (stream != null) {
            String jmsConfigJson = MessagingApplication.readString(stream);
            JSONObject jsonObject = new JSONObject(jmsConfigJson);
            return jsonObject.optJSONArray("Kafka");
        }
        return null;
    }

    public static String readString(InputStream inputStream) throws IOException {
        try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));){
            String string = br.lines().collect(Collectors.joining(System.lineSeparator()));
            return string;
        }
    }

    private static AdminClient getAdminClient() {
        Properties config = new Properties();
        config.put("bootstrap.servers", bootstrapServers);
        return AdminClient.create((Properties)config);
    }

    public static void addTopic(List<NewTopic> topicList) {
        AdminClient admin = MessagingApplication.getAdminClient();
        admin.createTopics(topicList);
    }

    public static boolean isTopicExists(String topic) throws Exception {
        return MessagingApplication.getTopics().contains(topic);
    }

    private static NewTopic addNewTopic(String topic) {
        HashMap configs = new HashMap();
        int partitions = 5;
        Short replication = 1;
        return new NewTopic(topic, partitions, replication.shortValue()).configs(configs);
    }

    private static Set<String> getTopics() throws Exception {
        AdminClient admin = MessagingApplication.getAdminClient();
        ListTopicsResult listTopics = admin.listTopics();
        return (Set)listTopics.names().get();
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory(this.producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        this.producerConfig.put("bootstrap.servers", bootstrapServers);
        if (this.producerConfig.size() == 1) {
            this.producerConfig.put("key.serializer", StringSerializer.class);
            this.producerConfig.put("value.serializer", StringSerializer.class);
            this.producerConfig.put("linger.ms", 1);
            this.producerConfig.put("acks", "all");
            this.producerConfig.put("retries", 0);
        }
        return this.producerConfig;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        this.consumerConfigs.put("bootstrap.servers", bootstrapServers);
        if (this.consumerConfigs.size() == 1) {
            this.consumerConfigs.put("key.deserializer", StringDeserializer.class);
            this.consumerConfigs.put("value.deserializer", StringDeserializer.class);
            this.consumerConfigs.put("group.id", "virtualan-consumer-1");
            this.consumerConfigs.put("auto.offset.reset", "earliest");
            this.consumerConfigs.put("max.poll.records", 1);
            this.consumerConfigs.put("max.poll.interval.ms", 1000);
            this.consumerConfigs.put("enable.auto.commit", true);
            this.consumerConfigs.put("auto.commit.interval.ms", 1000);
        }
        return this.consumerConfigs;
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return new DefaultKafkaConsumerFactory(this.consumerConfigs());
    }

    @Bean
    public DirectChannel sentToTransformer() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel listeningFromTransformer() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow listenerFromKafkaFlow() {
        return ((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlows.from((MessageProducerSpec)Kafka.messageDrivenChannelAdapter(this.consumerFactory(), (KafkaMessageDrivenChannelAdapter.ListenerMode)KafkaMessageDrivenChannelAdapter.ListenerMode.record, (String[])topics.toArray(new String[topics.size()])).configureListenerContainer(c -> c.ackMode(ContainerProperties.AckMode.RECORD).ackOnError(true).idleEventInterval(Long.valueOf(100L)).id("messageListenerContainer"))).channel((MessageChannel)this.sentToTransformer())).transform(this.transformer())).channel((MessageChannel)this.listeningFromTransformer())).get();
    }

    @Bean
    public GenericTransformer<Message<?>, MessageObject> transformer() {
        return new GenericTransformer<Message<?>, MessageObject>(){

            public MessageObject transform(Message<?> message) {
                return MessagingApplication.this.parse(message);
            }
        };
    }

    @Transformer
    public MessageObject parse(Message<?> message) {
        MessageObject messageObject = new MessageObject();
        try {
            messageObject.jsonObject = (JSONObject)new JSONTokener(message.getPayload().toString()).nextValue();
            messageObject.inboundTopic = message.getHeaders().get((Object)"kafka_receivedTopic").toString();
            return messageObject;
        }
        catch (JSONException e) {
            e.printStackTrace();
            return messageObject;
        }
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate(this.producerFactory());
    }

    @Bean
    public IntegrationFlow outboundGateFlow() {
        return ((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlows.from((MessageChannel)this.listeningFromTransformer()).handle((Object)this.getResponseMessage())).handle((Object)this.postMessage())).get();
    }

    @Bean
    public ResponseMessage getResponseMessage() {
        return new ResponseMessage(){

            @Override
            public MessageObject readResponseMessage(MessageObject messageObject) {
                VirtualServiceRequest virtualServiceRequest = new VirtualServiceRequest();
                virtualServiceRequest.setInput(messageObject.jsonObject.toString());
                virtualServiceRequest.setOperationId(messageObject.inboundTopic);
                virtualServiceRequest.setResource(messageObject.inboundTopic);
                ReturnMockResponse response = MessagingApplication.this.messageUtil.getMatchingRecord(virtualServiceRequest);
                if (response != null && response.getMockResponse() != null) {
                    messageObject.outputMessage = response.getMockResponse().getOutput();
                    messageObject.outboundTopic = response.getMockRequest().getMethod();
                    if (messageObject.outputMessage == null || messageObject.outboundTopic == null) {
                        log.info("No outputMessage response configured..");
                        return null;
                    }
                    log.info("Response configured.. with (" + messageObject.outboundTopic + ") :" + messageObject.outputMessage);
                    return messageObject;
                }
                log.info("No response configured for the given input");
                return null;
            }
        };
    }

    @Bean
    public SendMessage postMessage() {
        return new SendMessage(){

            @Override
            public String send(MessageObject messageObject) {
                if (messageObject.outboundTopic != null) {
                    Message message = MessageBuilder.withPayload((Object)messageObject.jsonObject.toString()).setHeader("kafka_topic", (Object)messageObject.outboundTopic).setHeader("kafka_messageKey", (Object)messageObject.messageKey).setHeader("kafka_partitionId", (Object)0).setHeader("X-Virtualan-Header", (Object)"Mock-Service-Response").build();
                    MessagingApplication.this.kafkaTemplate().send(message);
                }
                return null;
            }
        };
    }

    static {
        topics = new ArrayList<String>();
    }

    static interface SendMessage {
        public String send(MessageObject var1);
    }

    static interface ResponseMessage {
        public MessageObject readResponseMessage(MessageObject var1);
    }
}

