package org.grey.citycat.bus;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.grey.citycat.bus.core.dispatcher.i.IDispatcher;
import org.grey.citycat.bus.core.listener.i.CCListener;
import org.grey.citycat.bus.core.payload.EventPayload;
import org.grey.citycat.bus.core.payload.i.IPayload;
import org.grey.citycat.bus.core.payload.i.IPayloadSupport;
import org.grey.citycat.bus.enums.RoutingModeEnum;
import org.grey.citycat.bus.strategy.CCRouter;
import org.grey.citycat.bus.strategy.WaitStrategy;
import org.grey.citycat.constant.CCBusConstant;
import org.grey.citycat.exception.CitycatException;
import org.grey.citycat.util.AssertUtil;
import org.grey.citycat.util.ComplexUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/grey/citycat/bus/CCBusImpl.class */
public class CCBusImpl<event, listener extends CCListener<event>> extends Strategies<event, listener> implements CCBus<event, listener>, CCBusConfigurator<event, listener> {
    static final Logger log = LoggerFactory.getLogger(CCBusImpl.class);
    final Queue<EventPayload<event>> eventQueue;
    final Byte[] lock;
    private final Integer batchSize;

    public CCBusImpl() {
        this(RoutingModeEnum.TOPIC);
    }

    public CCBusImpl(RoutingModeEnum routingModeEnum) {
        this(routingModeEnum, routingModeEnum.getWait(), routingModeEnum.getRouter(), routingModeEnum.getListenerPayloadSupport(), routingModeEnum.getEventPayloadSupport(), routingModeEnum.getDispatcher());
    }

    private CCBusImpl(RoutingModeEnum routingModeEnum, WaitStrategy waitStrategy, CCRouter<event, ? extends IPayload<listener>, listener> cCRouter, IPayloadSupport<listener, ? extends IPayload<listener>> iPayloadSupport, IPayloadSupport<event, ? extends IPayload<event>> iPayloadSupport2, IDispatcher<event, EventPayload<event>> iDispatcher) {
        super(routingModeEnum, waitStrategy, cCRouter, iPayloadSupport, iPayloadSupport2, iDispatcher);
        this.eventQueue = new ConcurrentLinkedQueue();
        this.lock = new Byte[0];
        this.batchSize = 500000;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.grey.citycat.bus.CCBus
    public void register(String str, int i, listener listener) {
        this.router.add(this.listenerPayloadSupport.processAPayload(ComplexUtil.paramsBuilder(str, i, listener)));
        if (log.isDebugEnabled()) {
            log.debug("监听注册成功...topic【{}】index【{}】listener【{}】", new Object[]{str, Integer.valueOf(i), listener.getClass().getName()});
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.grey.citycat.bus.CCBus
    public void register(Class<event> cls, int i, listener listener) {
        this.router.add(this.listenerPayloadSupport.processAPayload(ComplexUtil.paramsBuilder(cls, i, listener)));
        if (log.isDebugEnabled()) {
            log.debug("监听注册成功...topic【{}】index【{}】listener【{}】", new Object[]{cls.getName(), Integer.valueOf(i), listener.getClass().getName()});
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.grey.citycat.bus.CCBus
    public void register(listener listener) {
        Class<?> cls = listener.getClass();
        this.router.add(this.listenerPayloadSupport.processAPayload(ComplexUtil.paramsBuilder(((Method) Arrays.stream(cls.getDeclaredMethods()).filter(method -> {
            return method.getName().equals("doListen") && !method.isBridge();
        }).findFirst().orElseThrow(() -> {
            return new CitycatException("监听类【" + cls.getName() + "】缺少doListen方法！");
        })).getParameterTypes()[0], 1, listener)));
        if (log.isDebugEnabled()) {
            log.debug("监听注册成功...topic【{}】index【{}】listener【{}】", new Object[]{cls.getName(), 1, listener.getClass().getName()});
        }
    }

    @Override // org.grey.citycat.bus.CCBus
    public void post(String str, event event) {
        this.dispatcher.dispatch((EventPayload) this.eventPayloadSupport.processAPayload(Map.of(CCBusConstant.TOPIC, str, CCBusConstant.EVENT, event)), this.router);
    }

    @Override // org.grey.citycat.bus.CCBus
    public void post(event event, String... strArr) {
        AssertUtil.assertSingleTopic(strArr);
        this.dispatcher.dispatch((EventPayload) this.eventPayloadSupport.processAPayload(Map.of(CCBusConstant.TOPIC, strArr[0], CCBusConstant.EVENT, event)), this.router);
    }

    @Override // org.grey.citycat.bus.CCBus
    public void asyncPost(event event, String... strArr) {
        AssertUtil.assertSingleTopic(strArr);
        this.eventQueue.add((EventPayload) this.eventPayloadSupport.processAPayload(Map.of(CCBusConstant.TOPIC, strArr[0], CCBusConstant.EVENT, event)));
        CompletableFuture.runAsync(() -> {
            synchronized (this.lock) {
                if (this.eventQueue.isEmpty()) {
                    return;
                }
                this.dispatcher.dispatch(this.eventQueue.poll(), this.router);
            }
        });
    }

    @Override // org.grey.citycat.bus.CCBus
    public void asyncBatchPost(event event, String... strArr) {
        AssertUtil.assertSingleTopic(strArr);
        this.eventQueue.offer((EventPayload) this.eventPayloadSupport.processAPayload(Map.of(CCBusConstant.TOPIC, strArr[0], CCBusConstant.EVENT, event)));
        AssertUtil.assertThenDo(obj -> {
            return this.eventQueue.size() >= this.batchSize.intValue();
        }, () -> {
            CompletableFuture.runAsync(this::batchQ);
        });
    }

    private void batchQ() {
        if (this.eventQueue.size() < this.batchSize.intValue()) {
            return;
        }
        do {
            this.dispatcher.dispatch(this.eventQueue.poll(), this.router);
        } while (this.eventQueue.size() < this.batchSize.intValue());
    }

    @Override // org.grey.citycat.bus.CCBusConfigurator
    public void router(CCRouter<event, ? extends IPayload<listener>, listener> cCRouter) {
        AssertUtil.assertThenDo(obj -> {
            return cCRouter != null;
        }, () -> {
            this.router = cCRouter;
        });
    }

    @Override // org.grey.citycat.bus.CCBusConfigurator
    public void wait(WaitStrategy waitStrategy) {
        AssertUtil.assertThenDo(obj -> {
            return waitStrategy != null;
        }, () -> {
            this.waitStrategy = waitStrategy;
        });
    }

    @Override // org.grey.citycat.bus.CCBusConfigurator
    public void listenerPayloadSupport(IPayloadSupport<listener, ? extends IPayload<listener>> iPayloadSupport) {
        AssertUtil.assertThenDo(obj -> {
            return iPayloadSupport != null;
        }, () -> {
            this.listenerPayloadSupport = iPayloadSupport;
        });
    }

    @Override // org.grey.citycat.bus.CCBusConfigurator
    public void eventPayloadSupport(IPayloadSupport<event, ? extends IPayload<event>> iPayloadSupport) {
        AssertUtil.assertThenDo(obj -> {
            return iPayloadSupport != null;
        }, () -> {
            this.eventPayloadSupport = iPayloadSupport;
        });
    }

    @Override // org.grey.citycat.bus.CCBusConfigurator
    public void dispatcher(IDispatcher<event, EventPayload<event>> iDispatcher) {
        AssertUtil.assertThenDo(obj -> {
            return iDispatcher != null;
        }, () -> {
            this.dispatcher = iDispatcher;
        });
    }
}
