package io.virtualan.message.core;

import io.virtualan.core.model.VirtualServiceRequest;
import io.virtualan.core.util.ReturnMockResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnResource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannelSpec;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.transformer.GenericTransformer;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@EnableKafka
@Configuration
@ConditionalOnClass({ConsumerProperties.class, Kafka.class, IntegrationFlows.class, MessageChannelSpec.class})
@EnableIntegration
@ConditionalOnResource(resources = {"classpath:kafka.json"})
/* loaded from: input_file:io/virtualan/message/core/MessagingApplication.class */
public class MessagingApplication {
    private static final Logger log = LoggerFactory.getLogger(MessagingApplication.class);
    public static List<NewTopic> topicList = new ArrayList();

    @Autowired
    private MessageUtil messageUtil;
    private static String bootstrapServers;
    private static String topicString;

    /* loaded from: input_file:io/virtualan/message/core/MessagingApplication$ResponseMessage.class */
    interface ResponseMessage {
        MessageObject readResponseMessage(MessageObject messageObject);
    }

    /* loaded from: input_file:io/virtualan/message/core/MessagingApplication$SendMessage.class */
    interface SendMessage {
        String send(MessageObject messageObject);
    }

    private static JSONArray getJsonObject() throws Exception {
        InputStream resourceAsStream = MessagingApplication.class.getClassLoader().getResourceAsStream("conf/kafka.json");
        if (resourceAsStream != null) {
            return new JSONObject(readString(resourceAsStream)).optJSONArray("Kafka");
        }
        return null;
    }

    public static String readString(InputStream inputStream) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        Throwable th = null;
        try {
            String str = (String) bufferedReader.lines().collect(Collectors.joining(System.lineSeparator()));
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            return str;
        } catch (Throwable th3) {
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            throw th3;
        }
    }

    private static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        return AdminClient.create(properties);
    }

    public static void addTopic(List<NewTopic> list) {
        getAdminClient().createTopics(list);
    }

    public static boolean isTopicExists(String str) throws Exception {
        return getTopics().contains(str);
    }

    private static NewTopic addNewTopic(String str) {
        Short sh = 1;
        return new NewTopic(str, 5, sh.shortValue()).configs(new HashMap());
    }

    private static Set<String> getTopics() throws Exception {
        return (Set) getAdminClient().listTopics().names().get();
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", bootstrapServers);
        hashMap.put("key.serializer", StringSerializer.class);
        hashMap.put("value.serializer", StringSerializer.class);
        hashMap.put("linger.ms", 1);
        hashMap.put("acks", "all");
        hashMap.put("retries", 0);
        return hashMap;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", bootstrapServers);
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", StringDeserializer.class);
        hashMap.put("group.id", "virtualan-consumer-1");
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("max.poll.records", 1);
        hashMap.put("max.poll.interval.ms", 1000);
        hashMap.put("enable.auto.commit", true);
        hashMap.put("auto.commit.interval.ms", 1000);
        return hashMap;
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean
    public DirectChannel sentToTransformer() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel listeningFromTransformer() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow listenerFromKafkaFlow() {
        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), KafkaMessageDrivenChannelAdapter.ListenerMode.record, topicString.split(",")).configureListenerContainer(kafkaMessageListenerContainerSpec -> {
            kafkaMessageListenerContainerSpec.ackMode(ContainerProperties.AckMode.RECORD).ackOnError(true).idleEventInterval(100L).id("messageListenerContainer");
        })).channel(sentToTransformer()).transform(transformer()).channel(listeningFromTransformer()).get();
    }

    @Bean
    public GenericTransformer<Message<?>, MessageObject> transformer() {
        return new GenericTransformer<Message<?>, MessageObject>() { // from class: io.virtualan.message.core.MessagingApplication.1
            public MessageObject transform(Message<?> message) {
                return MessagingApplication.this.parse(message);
            }
        };
    }

    @Transformer
    public MessageObject parse(Message<?> message) {
        MessageObject messageObject = new MessageObject();
        try {
            messageObject.jsonObject = (JSONObject) new JSONTokener(message.getPayload().toString()).nextValue();
            messageObject.inboundTopic = message.getHeaders().get("kafka_receivedTopic").toString();
            return messageObject;
        } catch (JSONException e) {
            e.printStackTrace();
            return messageObject;
        }
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public IntegrationFlow outboundGateFlow() {
        return IntegrationFlows.from(listeningFromTransformer()).handle(getResponseMessage()).handle(postMessage()).get();
    }

    @Bean
    public ResponseMessage getResponseMessage() {
        return new ResponseMessage() { // from class: io.virtualan.message.core.MessagingApplication.2
            @Override // io.virtualan.message.core.MessagingApplication.ResponseMessage
            public MessageObject readResponseMessage(MessageObject messageObject) {
                VirtualServiceRequest virtualServiceRequest = new VirtualServiceRequest();
                virtualServiceRequest.setInput(messageObject.jsonObject.toString());
                virtualServiceRequest.setOperationId(messageObject.inboundTopic);
                virtualServiceRequest.setResource(messageObject.inboundTopic);
                ReturnMockResponse matchingRecord = MessagingApplication.this.messageUtil.getMatchingRecord(virtualServiceRequest);
                messageObject.outputMessage = matchingRecord.getMockResponse().getOutput();
                messageObject.outboundTopic = matchingRecord.getMockRequest().getMethod();
                if (messageObject.outputMessage == null || messageObject.outboundTopic == null) {
                    MessagingApplication.log.info("No response configured..");
                    return null;
                }
                MessagingApplication.log.info("Response configured.. with (" + messageObject.outboundTopic + ") :" + messageObject.outputMessage);
                return messageObject;
            }
        };
    }

    @Bean
    public SendMessage postMessage() {
        return new SendMessage() { // from class: io.virtualan.message.core.MessagingApplication.3
            @Override // io.virtualan.message.core.MessagingApplication.SendMessage
            public String send(MessageObject messageObject) {
                if (messageObject.outboundTopic == null) {
                    return null;
                }
                MessagingApplication.this.kafkaTemplate().send(MessageBuilder.withPayload(messageObject.jsonObject.toString()).setHeader("kafka_topic", messageObject.outboundTopic).setHeader("kafka_messageKey", messageObject.messageKey).setHeader("kafka_partitionId", 0).setHeader("X-Virtualan-Header", "Mock-Service-Response").build());
                return null;
            }
        };
    }

    static {
        try {
            JSONObject optJSONObject = getJsonObject().optJSONObject(0);
            if (optJSONObject != null) {
                bootstrapServers = optJSONObject.getString("broker");
                topicString = optJSONObject.getString("topics");
                Set<String> topics = getTopics();
                for (String str : topicString.split(",")) {
                    if (!topics.contains(str)) {
                        topicList.add(addNewTopic(str));
                    }
                }
                if (topicList != null) {
                    addTopic(topicList);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
