package com.xxl.mq.client.factory;

import com.xxl.mq.client.broker.IXxlMqBroker;
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import com.xxl.mq.client.consumer.registry.ConsumerRegistryHelper;
import com.xxl.mq.client.consumer.thread.ConsumerThread;
import com.xxl.mq.client.message.XxlMqMessage;
import com.xxl.rpc.registry.impl.XxlRegistryServiceRegistry;
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.call.XxlRpcInvokeCallback;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.serialize.Serializer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xxl/mq/client/factory/XxlMqClientFactory.class */
public class XxlMqClientFactory {
    private String adminAddress;
    private String accessToken;
    private List<IMqConsumer> consumerList;
    private static IXxlMqBroker xxlMqBroker;
    private static final Logger logger = LoggerFactory.getLogger(XxlMqClientFactory.class);
    public static volatile boolean clientFactoryPoolStoped = false;
    private static ConsumerRegistryHelper consumerRegistryHelper = null;
    private static LinkedBlockingQueue<XxlMqMessage> newMessageQueue = new LinkedBlockingQueue<>();
    private static LinkedBlockingQueue<XxlMqMessage> callbackMessageQueue = new LinkedBlockingQueue<>();
    private ExecutorService clientFactoryThreadPool = Executors.newCachedThreadPool();
    private XxlRpcInvokerFactory xxlRpcInvokerFactory = null;
    private List<ConsumerThread> consumerRespository = new ArrayList();

    public void setAdminAddress(String str) {
        this.adminAddress = str;
    }

    public void setAccessToken(String str) {
        this.accessToken = str;
    }

    public void setConsumerList(List<IMqConsumer> list) {
        this.consumerList = list;
    }

    public void init() {
        validConsumer();
        startBrokerService();
        startConsumer();
    }

    public void destroy() throws Exception {
        destoryClientFactoryThreadPool();
        destoryConsumer();
        destoryBrokerService();
    }

    private void destoryClientFactoryThreadPool() {
        clientFactoryPoolStoped = true;
        this.clientFactoryThreadPool.shutdownNow();
    }

    public static IXxlMqBroker getXxlMqBroker() {
        return xxlMqBroker;
    }

    public static ConsumerRegistryHelper getConsumerRegistryHelper() {
        return consumerRegistryHelper;
    }

    public static void addMessages(XxlMqMessage xxlMqMessage, boolean z) {
        if (z) {
            newMessageQueue.add(xxlMqMessage);
        } else {
            xxlMqBroker.addMessages(Arrays.asList(xxlMqMessage));
        }
    }

    public static void callbackMessage(XxlMqMessage xxlMqMessage) {
        callbackMessageQueue.add(xxlMqMessage);
    }

    public void startBrokerService() {
        this.xxlRpcInvokerFactory = new XxlRpcInvokerFactory(XxlRegistryServiceRegistry.class, new HashMap<String, String>() { // from class: com.xxl.mq.client.factory.XxlMqClientFactory.1
            {
                put("XXL_REGISTRY_ADDRESS", XxlMqClientFactory.this.adminAddress);
                put("ACCESS_TOKEN", XxlMqClientFactory.this.accessToken);
            }
        });
        try {
            this.xxlRpcInvokerFactory.start();
            consumerRegistryHelper = new ConsumerRegistryHelper(this.xxlRpcInvokerFactory.getServiceRegistry());
            xxlMqBroker = (IXxlMqBroker) new XxlRpcReferenceBean(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC, LoadBalance.ROUND, IXxlMqBroker.class, (String) null, 10000L, (String) null, (String) null, (XxlRpcInvokeCallback) null, this.xxlRpcInvokerFactory).getObject();
            for (int i = 0; i < 3; i++) {
                this.clientFactoryThreadPool.execute(new Runnable() { // from class: com.xxl.mq.client.factory.XxlMqClientFactory.2
                    @Override // java.lang.Runnable
                    public void run() {
                        while (!XxlMqClientFactory.clientFactoryPoolStoped) {
                            try {
                                XxlMqMessage xxlMqMessage = (XxlMqMessage) XxlMqClientFactory.newMessageQueue.take();
                                if (xxlMqMessage != null) {
                                    ArrayList arrayList = new ArrayList();
                                    arrayList.add(xxlMqMessage);
                                    ArrayList arrayList2 = new ArrayList();
                                    if (XxlMqClientFactory.newMessageQueue.drainTo(arrayList2, 100) > 0) {
                                        arrayList.addAll(arrayList2);
                                    }
                                    XxlMqClientFactory.xxlMqBroker.addMessages(arrayList);
                                }
                            } catch (Exception e) {
                                if (!XxlMqClientFactory.clientFactoryPoolStoped) {
                                    XxlMqClientFactory.logger.error(e.getMessage(), e);
                                }
                            }
                        }
                        ArrayList arrayList3 = new ArrayList();
                        if (XxlMqClientFactory.newMessageQueue.drainTo(arrayList3) > 0) {
                            XxlMqClientFactory.xxlMqBroker.addMessages(arrayList3);
                        }
                    }
                });
            }
            for (int i2 = 0; i2 < 3; i2++) {
                this.clientFactoryThreadPool.execute(new Runnable() { // from class: com.xxl.mq.client.factory.XxlMqClientFactory.3
                    @Override // java.lang.Runnable
                    public void run() {
                        while (!XxlMqClientFactory.clientFactoryPoolStoped) {
                            try {
                                XxlMqMessage xxlMqMessage = (XxlMqMessage) XxlMqClientFactory.callbackMessageQueue.take();
                                if (xxlMqMessage != null) {
                                    ArrayList arrayList = new ArrayList();
                                    arrayList.add(xxlMqMessage);
                                    ArrayList arrayList2 = new ArrayList();
                                    if (XxlMqClientFactory.callbackMessageQueue.drainTo(arrayList2, 100) > 0) {
                                        arrayList.addAll(arrayList2);
                                    }
                                    XxlMqClientFactory.xxlMqBroker.callbackMessages(arrayList);
                                }
                            } catch (Exception e) {
                                if (!XxlMqClientFactory.clientFactoryPoolStoped) {
                                    XxlMqClientFactory.logger.error(e.getMessage(), e);
                                }
                            }
                        }
                        ArrayList arrayList3 = new ArrayList();
                        if (XxlMqClientFactory.callbackMessageQueue.drainTo(arrayList3) > 0) {
                            XxlMqClientFactory.xxlMqBroker.callbackMessages(arrayList3);
                        }
                    }
                });
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void destoryBrokerService() throws Exception {
        if (this.xxlRpcInvokerFactory != null) {
            this.xxlRpcInvokerFactory.stop();
        }
    }

    private void validConsumer() {
        if (this.consumerList == null || this.consumerList.size() == 0) {
            logger.warn(">>>>>>>>>>> xxl-mq, MqConsumer not found.");
            return;
        }
        for (IMqConsumer iMqConsumer : this.consumerList) {
            MqConsumer mqConsumer = (MqConsumer) iMqConsumer.getClass().getAnnotation(MqConsumer.class);
            if (mqConsumer == null) {
                throw new RuntimeException("xxl-mq, MqConsumer(" + iMqConsumer.getClass() + "),annotation is not exists.");
            }
            if (mqConsumer.group() == null || mqConsumer.group().trim().length() == 0) {
                throw new RuntimeException("xxl-mq, MqConsumer(" + iMqConsumer.getClass() + "),group is empty.");
            }
            if (mqConsumer.topic() == null || mqConsumer.topic().trim().length() == 0) {
                throw new RuntimeException("xxl-mq, MqConsumer(" + iMqConsumer.getClass() + "),topic is empty.");
            }
            this.consumerRespository.add(new ConsumerThread(iMqConsumer));
        }
    }

    private void startConsumer() {
        if (this.consumerRespository == null || this.consumerRespository.size() == 0) {
            return;
        }
        for (ConsumerThread consumerThread : this.consumerRespository) {
            this.clientFactoryThreadPool.execute(consumerThread);
            logger.info(">>>>>>>>>>> xxl-mq, consumer init success, , topic:{}, group:{}", consumerThread.getMqConsumer().topic(), consumerThread.getMqConsumer().group());
        }
        getConsumerRegistryHelper().registerConsumer(this.consumerRespository);
    }

    private void destoryConsumer() {
        if (this.consumerRespository == null || this.consumerRespository.size() == 0) {
            return;
        }
        getConsumerRegistryHelper().removeConsumer(this.consumerRespository);
    }
}
