package com.wichell.framework.interceptor;

import com.wichell.framework.bean.MsgBean;
import com.wichell.framework.ecache.EcacheUtil;
import com.wichell.framework.util.JacksonUtil;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/wichell/framework/interceptor/KafKaSendAdvice.class */
public class KafKaSendAdvice implements SendAdvice {
    private Properties props;
    private Producer<String, String> producer;
    private static final String CACHE_KEY = "kafkalog";
    private static final String CACHE_NAME = "kafkaCache";
    private ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());
    private Map<Integer, Integer> count = new ConcurrentHashMap();
    private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    private static Logger logger = LogManager.getLogger(KafKaSendAdvice.class);

    /* loaded from: input_file:com/wichell/framework/interceptor/KafKaSendAdvice$kafkaRunable.class */
    class kafkaRunable extends Thread {
        kafkaRunable() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long parseLong = Long.parseLong((String) KafKaSendAdvice.this.props.get("heart.ms"));
            while (true) {
                try {
                    KafKaSendAdvice.this.send();
                    sleep(parseLong);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /* loaded from: input_file:com/wichell/framework/interceptor/KafKaSendAdvice$kafkaSendRunable.class */
    class kafkaSendRunable extends Thread {
        private KeyedMessage msg;

        public kafkaSendRunable(KeyedMessage keyedMessage) {
            this.msg = keyedMessage;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                KafKaSendAdvice.this.producer.send(this.msg);
            } catch (Exception e) {
                KafKaSendAdvice.logger.error("=============================message not operated" + this.msg);
                e.printStackTrace();
                Object obj = EcacheUtil.get(KafKaSendAdvice.CACHE_NAME, KafKaSendAdvice.CACHE_KEY);
                List copyOnWriteArrayList = obj == null ? new CopyOnWriteArrayList() : (List) obj;
                if (!KafKaSendAdvice.this.count.containsKey(Integer.valueOf(this.msg.hashCode()))) {
                    KafKaSendAdvice.this.count.put(Integer.valueOf(this.msg.hashCode()), 1);
                    copyOnWriteArrayList.add(this.msg);
                    EcacheUtil.cache(KafKaSendAdvice.CACHE_NAME, KafKaSendAdvice.CACHE_KEY, copyOnWriteArrayList);
                    return;
                }
                Integer num = (Integer) KafKaSendAdvice.this.count.get(Integer.valueOf(this.msg.hashCode()));
                if (num.intValue() < 3) {
                    KafKaSendAdvice.this.count.put(Integer.valueOf(this.msg.hashCode()), Integer.valueOf(num.intValue() + 1));
                    return;
                }
                KafKaSendAdvice.this.count.remove(Integer.valueOf(this.msg.hashCode()));
                copyOnWriteArrayList.remove(this.msg);
                EcacheUtil.cache(KafKaSendAdvice.CACHE_NAME, KafKaSendAdvice.CACHE_KEY, copyOnWriteArrayList);
            }
        }
    }

    public void setProps(Properties properties) {
        this.props = properties;
    }

    public void init() {
        if (this.producer == null) {
            this.producer = new Producer<>(new ProducerConfig(this.props));
        }
        if (this.pool.getPoolSize() == 0) {
            this.pool.execute(new kafkaRunable());
        }
    }

    public synchronized void addMessage(MsgBean msgBean) {
        try {
            this.cachedThreadPool.execute(new kafkaSendRunable(new KeyedMessage(msgBean.getTopic(), JacksonUtil.obj2Json(msgBean))));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public synchronized void send() {
        Object obj = EcacheUtil.get(CACHE_NAME, CACHE_KEY);
        if (obj != null) {
            this.producer.send((List) obj);
            EcacheUtil.del(CACHE_NAME, CACHE_KEY);
        }
    }
}
