/*
 * Decompiled with CFR 0.152.
 */
package core.apiCore.interfaces;

import core.apiCore.helpers.DataHelper;
import core.apiCore.helpers.MessageQueueHelper;
import core.support.configReader.Config;
import core.support.logger.TestLog;
import core.support.objects.KeyValue;
import core.support.objects.MessageObject;
import core.support.objects.ServiceObject;
import core.support.objects.TestObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaInterface {
    public static final String KAFKA_SERVER_URL = "kafka.bootstrap.servers";
    public static final String KAFKA_CLIENT_ID = "kafka.clientId";
    public static final String KFAKA_TOPIC = "kafka.topic";
    public static final String KFAKA_OUTBOUND_TOPIC = "kafka.outbound.topic";
    public static final String KAFKA_GROUP_ID = "kafka.group.id";
    public static final String KAFKA_TIMEOUT_SECONDS = "kafka.timeout.seconds";
    public static final String KAFKA_MESSAGE_ID_PREFIX = "kafka.msgId.prefix";
    public static Map<ConsumerRecord<String, String>, Boolean> outboundMessages = new ConcurrentHashMap<ConsumerRecord<String, String>, Boolean>();

    public static void testKafkaInterface(ServiceObject serviceObject) throws Exception {
        KafkaInterface.evaluateOption(serviceObject);
        serviceObject.withRequestBody(DataHelper.getRequestBodyIncludingTemplate(serviceObject));
        String messageId = MessageQueueHelper.generateMessageId(serviceObject, Config.getValue(KAFKA_MESSAGE_ID_PREFIX));
        KafkaInterface.sendKafkaMessage(serviceObject, messageId);
        MessageQueueHelper.receiveAndValidateMessages(serviceObject, messageId, MessageObject.messageType.KAFKA);
    }

    public static void sendKafkaMessage(ServiceObject serviceObject, String messageId) {
        if (serviceObject.getRequestBody().isEmpty()) {
            return;
        }
        KafkaProducer producer = null;
        String messageBody = serviceObject.getRequestBody();
        try {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", Config.getValue(KAFKA_SERVER_URL));
            properties.put("key.serializer", StringSerializer.class);
            properties.put("value.serializer", StringSerializer.class);
            properties.put("retries", "3");
            producer = new KafkaProducer(properties);
            ProducerRecord record = new ProducerRecord(Config.getValue(KFAKA_TOPIC), (Object)messageId.toString(), (Object)messageBody);
            producer.send(record).get();
            TestLog.logPass("sent messageId : " + messageId + "\n message : " + messageBody, new Object[0]);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }

    public static void getOutboundMessages() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Config.getValue(KAFKA_SERVER_URL));
        props.put("group.id", Config.getValue(KAFKA_GROUP_ID));
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        String topic = Config.getValue(KFAKA_TOPIC);
        String outboundTopic = Config.getValue(KFAKA_OUTBOUND_TOPIC);
        if (!outboundTopic.isEmpty()) {
            topic = outboundTopic;
        }
        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(topic));
        int giveUp = 5;
        int noRecordsCount = 0;
        ConsumerRecords consumerRecords = null;
        do {
            if ((consumerRecords = consumer.poll(Duration.ofMillis(3000L))).count() == 0) {
                ++noRecordsCount;
                continue;
            }
            consumerRecords.forEach(record -> {
                ArrayList<String> headers = new ArrayList<String>();
                for (Header header : record.headers()) {
                    headers.add(header.value().toString());
                }
                MessageObject message = new MessageObject().withMessageType(MessageObject.messageType.KAFKA).withMessageId((String)record.key()).withMessage((String)record.value()).withTopic(record.topic()).withHeader(headers);
                TestLog.logPass("Received messageId '" + message.getMessageId() + "\n with message content: " + message.getMessage(), new Object[0]);
                MessageObject.outboundMessages.put(message, true);
            });
            TestLog.logPass("global message size in outbound list: " + outboundMessages.size(), new Object[0]);
            consumer.commitAsync();
        } while (consumerRecords.isEmpty() && noRecordsCount < 5);
        consumer.close();
    }

    public static void evaluateOption(ServiceObject serviceObject) {
        KafkaInterface.setDefaultTopic();
        if (serviceObject.getOption().isEmpty()) {
            return;
        }
        DataHelper.saveDataToConfig(serviceObject.getOption());
        serviceObject.withOption(DataHelper.replaceParameters(serviceObject.getOption()));
        List<KeyValue> keywords = DataHelper.getValidationMap(serviceObject.getOption());
        for (KeyValue keyword : keywords) {
            switch (keyword.key.toLowerCase()) {
                case "topic": {
                    Config.putValue(KFAKA_TOPIC, keyword.value);
                    break;
                }
                case "outbound_topic": {
                    Config.putValue(KFAKA_OUTBOUND_TOPIC, keyword.value);
                    break;
                }
                case "response_identifier": {
                    Config.putValue("response.identifier", keyword.value);
                }
            }
        }
    }

    private static void setDefaultTopic() {
        String defaultTopic = TestObject.getDefaultTestInfo().config.get(KFAKA_TOPIC).toString();
        String ouboundTopic = TestObject.getDefaultTestInfo().config.get(KFAKA_OUTBOUND_TOPIC).toString();
        Config.putValue(KFAKA_TOPIC, defaultTopic);
        Config.putValue(KFAKA_OUTBOUND_TOPIC, ouboundTopic);
        Config.putValue("response.identifier", "");
    }
}

