package com.wichell.framework.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import com.wichell.framework.bean.MsgBean;
import com.wichell.framework.util.JacksonUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextRefreshedEvent;

/* loaded from: input_file:com/wichell/framework/kafka/KafKaProcessListener.class */
public class KafKaProcessListener implements IKafkaProcessListener {
    private ConsumerConnector consumer;
    private Properties props;
    private KafKaProcess kafKaProcess;
    private ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/wichell/framework/kafka/KafKaProcessListener$processThread.class */
    public class processThread implements Runnable {
        private String[] topic;
        private long heartMs;

        public processThread(String[] strArr, long j) {
            this.topic = strArr;
            this.heartMs = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            HashMap hashMap = new HashMap(50);
            for (String str : this.topic) {
                hashMap.put(str, new Integer(1));
            }
            while (true) {
                try {
                    Map createMessageStreams = KafKaProcessListener.this.consumer.createMessageStreams(hashMap);
                    Iterator it = createMessageStreams.keySet().iterator();
                    while (it.hasNext()) {
                        Iterator it2 = ((List) createMessageStreams.get((String) it.next())).iterator();
                        while (it2.hasNext()) {
                            ConsumerIterator it3 = ((KafkaStream) it2.next()).iterator();
                            while (it3.hasNext()) {
                                try {
                                    KafKaProcessListener.this.kafKaProcess.process((MsgBean) JacksonUtil.json2Obj(new String((byte[]) it3.next().message(), "UTF-8"), new TypeReference<MsgBean>() { // from class: com.wichell.framework.kafka.KafKaProcessListener.processThread.1
                                    }));
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                    Thread.sleep(this.heartMs);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
            }
        }
    }

    public void setProps(Properties properties) {
        this.props = properties;
    }

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
            this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.props));
            if (this.pool.getPoolSize() == 0) {
                this.pool.execute(new processThread(this.props.getProperty("topic").split(","), Long.parseLong((String) this.props.get("heart.ms"))));
            }
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.kafKaProcess = (KafKaProcess) applicationContext.getBean(KafKaProcess.class);
    }
}
