package io.virtualan.cucumblan.core.msg.kafka;

import io.virtualan.cucumblan.message.exception.MessageNotDefinedException;
import io.virtualan.cucumblan.message.type.MessageType;
import io.virtualan.cucumblan.props.ApplicationConfiguration;
import io.virtualan.cucumblan.props.TopicConfiguration;
import io.virtualan.cucumblan.props.util.EventRequest;
import io.virtualan.cucumblan.props.util.StepDefinitionHelper;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:io/virtualan/cucumblan/core/msg/kafka/KafkaConsumerClient.class */
public class KafkaConsumerClient {
    private static final Logger LOGGER = Logger.getLogger(KafkaConsumerClient.class.getName());
    private final KafkaConsumer consumer;
    private List<String> topic;

    public KafkaConsumerClient(String str, String str2) {
        Properties properties = new Properties();
        try {
            this.topic = loadTopic(str);
            InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer-" + str2 + ".properties");
            if (resourceAsStream != null) {
                properties.load(resourceAsStream);
                if (!properties.containsKey("group.id")) {
                    properties.put("group.id", str + "_" + str2 + "_" + UUID.randomUUID());
                }
            } else {
                LOGGER.warning("consumer-" + str2 + ".properties is not found");
                System.exit(1);
            }
        } catch (IOException e) {
            LOGGER.warning("consumer-" + str2 + ".properties is not loaded");
            System.exit(1);
        }
        this.consumer = new KafkaConsumer(properties);
    }

    public static MessageType getEvent(EventRequest eventRequest) throws InterruptedException, MessageNotDefinedException {
        MessageType messageType = (MessageType) MessageContext.getEventContextMap(eventRequest.getEventName(), eventRequest.getId());
        eventRequest.setRecheck(eventRequest.getRecheck() + 1);
        if (eventRequest.getRecheck() == 5 || messageType != null) {
            if (eventRequest.getClient() != null) {
                eventRequest.getClient().closeConsumer();
            }
            return messageType;
        }
        Thread.sleep(1000L);
        if (eventRequest.getClient() == null) {
            eventRequest.setClient(new KafkaConsumerClient(eventRequest.getEventName(), eventRequest.getResource()));
        }
        eventRequest.getClient().run(eventRequest.getEventName(), eventRequest.getType(), eventRequest.getId());
        return getEvent(eventRequest);
    }

    private List<String> loadTopic(String str) {
        String actualValue = StepDefinitionHelper.getActualValue(TopicConfiguration.getProperty(str));
        if (actualValue == null) {
            LOGGER.warning(str + " - Topic is not configured.");
            System.exit(1);
        }
        return Arrays.asList(actualValue.split(";"));
    }

    public void run(String str, String str2, String str3) throws MessageNotDefinedException {
        this.consumer.subscribe(this.topic);
        LOGGER.info(" Read Received message: " + this.topic);
        int i = 0;
        while (true) {
            boolean isEventContextMap = MessageContext.isEventContextMap(str, str3);
            if (isEventContextMap) {
                break;
            }
            ConsumerRecords poll = this.consumer.poll(Duration.of(1000L, ChronoUnit.MILLIS));
            if (poll.count() == 0) {
                i++;
                if (i > ApplicationConfiguration.getMessageCount() || isEventContextMap) {
                    break;
                }
            } else {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    getMessageType(str, str2, (ConsumerRecord) it.next());
                    this.consumer.commitAsync();
                }
            }
        }
        LOGGER.info("DONE");
    }

    public void closeConsumer() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    private boolean getMessageType(String str, String str2, ConsumerRecord<Object, Object> consumerRecord) throws MessageNotDefinedException {
        MessageType messageType = MessageContext.getMessageTypes().get(str2);
        if (messageType == null) {
            throw new MessageNotDefinedException(str2 + " message type is not defined ");
        }
        try {
            MessageType buildConsumerMessage = messageType.buildConsumerMessage(consumerRecord, consumerRecord.key(), consumerRecord.value());
            if (buildConsumerMessage != null && buildConsumerMessage.getId() != null) {
                MessageContext.setEventContextMap(str, buildConsumerMessage.getId().toString(), buildConsumerMessage);
                return true;
            }
            if (buildConsumerMessage != null) {
                throw new MessageNotDefinedException("Id is not defined ");
            }
            return false;
        } catch (MessageNotDefinedException e) {
            LOGGER.warning(consumerRecord.key() + " is not defined " + e.getMessage());
            throw e;
        }
    }
}
