/*
 * Decompiled with CFR 0.152.
 */
package io.easywalk.simply.eventable.kafka.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.easywalk.simply.eventable.kafka.SimplyEventableMessage;
import io.easywalk.simply.eventable.kafka.consumer.EventHandler;
import io.easywalk.simply.eventable.kafka.consumer.SimplyConsumer;
import java.util.HashMap;
import java.util.Map;
import javax.persistence.MappedSuperclass;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@MappedSuperclass
public abstract class AbstractSimplyConsumer<T, ID>
implements SimplyConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractSimplyConsumer.class);
    protected ObjectMapper mapper = new ObjectMapper();
    protected String topic;
    protected Class<T> type;
    @Value(value="${spring.application.name}")
    private String groupId;
    @Autowired
    private KafkaProperties kafkaProperties;

    protected AbstractSimplyConsumer(String topic, Class<T> type) {
        this.topic = topic;
        this.type = type;
    }

    @Override
    public abstract void on(SimplyEventableMessage<T> var1);

    @Bean
    public void messageListenerContainer() {
        ContainerProperties containerProps = new ContainerProperties(new String[]{this.topic});
        containerProps.setMessageListener(new EventHandler(this.type, this));
        KafkaMessageListenerContainer<ID, SimplyEventableMessage<T>> container = this.createContainer(containerProps);
        container.setBeanName(this.type.getName() + "ListenerBean");
        container.start();
    }

    private Map<String, Object> consumerProps() {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", this.kafkaProperties.getBootstrapServers());
        props.put("group.id", this.groupId);
        props.put("enable.auto.commit", true);
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", JsonDeserializer.class);
        props.put("spring.json.trusted.packages", SimplyEventableMessage.class.getPackageName());
        return props;
    }

    private KafkaMessageListenerContainer<ID, SimplyEventableMessage<T>> createContainer(ContainerProperties containerProps) {
        Map<String, Object> props = this.consumerProps();
        DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props);
        KafkaMessageListenerContainer container = new KafkaMessageListenerContainer((ConsumerFactory)cf, containerProps);
        return container;
    }

    protected T convertToEntity(Object source, Class<T> clazz) throws JsonProcessingException {
        String s = this.mapper.writeValueAsString(source);
        return (T)this.mapper.readValue(s, clazz);
    }
}

