package com.netflix.eventbus.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.eventbus.spi.CatchAllSubscriber;
import com.netflix.eventbus.spi.EventBus;
import com.netflix.eventbus.spi.EventCreator;
import com.netflix.eventbus.spi.EventFilter;
import com.netflix.eventbus.spi.InvalidSubscriberException;
import com.netflix.eventbus.spi.Subscribe;
import com.netflix.eventbus.spi.SubscriberConfigProvider;
import com.netflix.eventbus.spi.SubscriberInfo;
import com.netflix.eventbus.utils.EventBusUtils;
import com.netflix.servo.monitor.Stopwatch;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/netflix-eventbus-0.3.0.jar:com/netflix/eventbus/impl/EventBusImpl.class */
public class EventBusImpl implements EventBus {
    private final SetMultimap<Class<?>, EventConsumer> consumersByEventType = Multimaps.newSetMultimap(new ConcurrentHashMap(), new Supplier<Set<EventConsumer>>() { // from class: com.netflix.eventbus.impl.EventBusImpl.1
        @Override // com.google.common.base.Supplier, java.util.function.Supplier
        public Set<EventConsumer> get() {
            return new CopyOnWriteArraySet();
        }
    });
    private final ConcurrentHashMap<Class<?>, CopyOnWriteArrayList<EventConsumer>> consumersBySubscriberClass = new ConcurrentHashMap<>();
    private final SetMultimap<Class<?>, EventFilter> eventTypeVsFilters = Multimaps.newSetMultimap(new ConcurrentHashMap(), new Supplier<Set<EventFilter>>() { // from class: com.netflix.eventbus.impl.EventBusImpl.2
        @Override // com.google.common.base.Supplier, java.util.function.Supplier
        public Set<EventFilter> get() {
            return new CopyOnWriteArraySet();
        }
    });
    private ConsumerQueueSupplier consumerQueueSupplier = new DefaultConsumerQueueSupplier();
    private EventBusStats stats = new EventBusStats(STATS_COLLECTION_DURATION_MILLIS.get());
    private EventConsumer catchAllSubscriber;
    private volatile CatchAllSubscriber catchAllSubInstance;
    private static final Logger LOGGER = LoggerFactory.getLogger(EventBusImpl.class);
    static final DynamicIntProperty STATS_COLLECTION_DURATION_MILLIS = DynamicPropertyFactory.getInstance().getIntProperty("eventbus.stats.collection.duration.millis", 60000);
    private static LoadingCache<Class<?>, Set<Class<?>>> eventHierarchyCache = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<Class<?>, Set<Class<?>>>() { // from class: com.netflix.eventbus.impl.EventBusImpl.3
        @Override // com.google.common.cache.CacheLoader
        public Set<Class<?>> load(Class<?> cls) throws Exception {
            LinkedList newLinkedList = Lists.newLinkedList();
            HashSet newHashSet = Sets.newHashSet();
            newLinkedList.add(cls);
            while (!newLinkedList.isEmpty()) {
                Class cls2 = (Class) newLinkedList.remove(0);
                newHashSet.add(cls2);
                Class superclass = cls2.getSuperclass();
                if (superclass != null && !superclass.equals(Object.class)) {
                    newLinkedList.add(superclass);
                }
                Collections.addAll(newLinkedList, cls2.getInterfaces());
            }
            return newHashSet;
        }
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:WEB-INF/lib/netflix-eventbus-0.3.0.jar:com/netflix/eventbus/impl/EventBusImpl$ConsumerQueueSupplier.class */
    public interface ConsumerQueueSupplier {

        /* loaded from: input_file:WEB-INF/lib/netflix-eventbus-0.3.0.jar:com/netflix/eventbus/impl/EventBusImpl$ConsumerQueueSupplier$ConsumerQueue.class */
        public interface ConsumerQueue {
            boolean offer(Object obj);

            Object nonBlockingTake();

            Object blockingTake() throws InterruptedException;

            void clear();
        }

        ConsumerQueue get(Method method, SubscriberConfigProvider.SubscriberConfig subscriberConfig, AtomicLong atomicLong);
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void publish(Object obj) {
        Stopwatch start = this.stats.publishStats.start();
        try {
            try {
                if (!applyEventLevelFilters(obj)) {
                    start.stop();
                    return;
                }
                Iterator<Class<?>> it = getAllTypesForAnEvent(obj).iterator();
                while (it.hasNext()) {
                    Iterator<EventConsumer> it2 = this.consumersByEventType.get((SetMultimap<Class<?>, EventConsumer>) it.next()).iterator();
                    while (it2.hasNext()) {
                        it2.next().enqueue(obj);
                    }
                }
                if (null != this.catchAllSubInstance && this.catchAllSubInstance.isEnabled()) {
                    this.catchAllSubscriber.enqueue(obj);
                }
                start.stop();
            } catch (Throwable th) {
                LOGGER.error("Error occured while publishing event. Swallowing the error to avoid publisher from failing.", th);
                this.stats.publishErrors.increment();
                start.stop();
            }
        } catch (Throwable th2) {
            start.stop();
            throw th2;
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void publishIffNotDead(EventCreator eventCreator, Class<?>... clsArr) {
        Stopwatch start = this.stats.conditionalPublishStats.start();
        try {
            try {
                HashMap hashMap = new HashMap();
                for (Class<?> cls : clsArr) {
                    Iterator<Class<?>> it = getAllTypesForAnEventType(cls).iterator();
                    while (it.hasNext()) {
                        Set<EventConsumer> set = this.consumersByEventType.get((SetMultimap<Class<?>, EventConsumer>) it.next());
                        if (!set.isEmpty()) {
                            hashMap.put(cls, set);
                        }
                    }
                }
                if (hashMap.isEmpty()) {
                    LOGGER.debug(String.format("Skipping publishing of events types %s as there are no interested listeners.", Arrays.toString(clsArr)));
                    start.stop();
                    return;
                }
                List<?> createEvent = eventCreator.createEvent(hashMap.keySet());
                if (null == createEvent) {
                    LOGGER.debug(String.format("No events created by event creator for event types %s", hashMap.keySet()));
                    start.stop();
                    return;
                }
                for (Object obj : createEvent) {
                    if (applyEventLevelFilters(obj)) {
                        Iterator it2 = ((Set) hashMap.get(obj.getClass())).iterator();
                        while (it2.hasNext()) {
                            ((EventConsumer) it2.next()).enqueue(obj);
                        }
                    }
                }
                start.stop();
            } catch (Throwable th) {
                LOGGER.error("Error occured while publishing event. Swallowing the error to avoid publisher from failing.", th);
                this.stats.conditionalPublishErrors.increment();
                start.stop();
            }
        } catch (Throwable th2) {
            start.stop();
            throw th2;
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void registerSubscriber(@Nullable EventFilter eventFilter, Object obj) throws InvalidSubscriberException {
        ArrayList arrayList = new ArrayList();
        for (Method method : findSubscriberMethods(obj)) {
            Class<?> interestedEventType = EventBusUtils.getInterestedEventType(obj, method);
            EventConsumer eventConsumer = new EventConsumer(method, obj, eventFilter, interestedEventType, this.consumerQueueSupplier);
            arrayList.add(eventConsumer);
            this.consumersByEventType.put(interestedEventType, eventConsumer);
        }
        CopyOnWriteArrayList<EventConsumer> putIfAbsent = this.consumersBySubscriberClass.putIfAbsent(obj.getClass(), new CopyOnWriteArrayList<>(arrayList));
        if (null != putIfAbsent) {
            putIfAbsent.addAll(arrayList);
        } else {
            LOGGER.info(String.format("Registered a new subscriber: %s with filter: %s", obj, eventFilter));
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void registerSubscriber(Object obj) throws InvalidSubscriberException {
        registerSubscriber(null, obj);
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public synchronized boolean enableCatchAllSubscriber(BlockingQueue blockingQueue) {
        if (null == this.catchAllSubscriber) {
            this.catchAllSubInstance = new CatchAllSubscriber();
            try {
                List<Method> findSubscriberMethods = findSubscriberMethods(this.catchAllSubInstance);
                if (!findSubscriberMethods.isEmpty()) {
                    this.catchAllSubscriber = new EventConsumer(findSubscriberMethods.get(0), this.catchAllSubInstance, null, Object.class, this.consumerQueueSupplier);
                }
            } catch (InvalidSubscriberException e) {
                LOGGER.error("Catch all subscriber invalid!", (Throwable) e);
                return false;
            }
        }
        return this.catchAllSubInstance.enable(blockingQueue);
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public synchronized void disableCatchAllSubscriber() {
        if (null != this.catchAllSubInstance) {
            this.catchAllSubInstance.disable();
        } else {
            LOGGER.info("Catch all subscriber is not enabled, disable call ignored.");
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public Set<Object> unregisterSubscriber(Class<?> cls) {
        LOGGER.info("Unregistring subscriber class: " + cls);
        HashSet hashSet = new HashSet();
        CopyOnWriteArrayList<EventConsumer> remove = this.consumersBySubscriberClass.remove(cls);
        if (null != remove) {
            Iterator<EventConsumer> it = remove.iterator();
            while (it.hasNext()) {
                EventConsumer next = it.next();
                next.shutdown();
                this.consumersByEventType.remove(next.getTargetEventClass(), next);
                hashSet.add(next.getContainerInstance());
            }
            LOGGER.info(String.format("Subscriber: %s successfully unregistered", cls));
        } else {
            LOGGER.info(String.format("Subscriber: %s is not registered (or already removed). Ignoring unregister.", cls));
        }
        return hashSet;
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public boolean unregisterSubscriber(Object obj) {
        LOGGER.info("Unregistring subscriber instance: " + obj);
        CopyOnWriteArrayList<EventConsumer> copyOnWriteArrayList = this.consumersBySubscriberClass.get(obj.getClass());
        boolean z = false;
        EventConsumer eventConsumer = null;
        if (null != copyOnWriteArrayList) {
            Iterator<EventConsumer> it = copyOnWriteArrayList.iterator();
            while (it.hasNext()) {
                EventConsumer next = it.next();
                if (next.getContainerInstance() == obj) {
                    eventConsumer = next;
                }
            }
            if (null != eventConsumer && copyOnWriteArrayList.remove(eventConsumer)) {
                eventConsumer.shutdown();
                this.consumersByEventType.remove(eventConsumer.getTargetEventClass(), eventConsumer);
                z = true;
            }
        }
        if (z) {
            LOGGER.info(String.format("Subscriber instance: %s successfully unregistered", obj));
        } else {
            LOGGER.info(String.format("Subscriber instance: %s is not registered (or already removed). Ignoring unregister.", obj));
        }
        return z;
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void addFilterForSubscriber(EventFilter eventFilter, SubscriberInfo subscriberInfo) {
        EventConsumer findEventConsumerForSubscriberMethod = findEventConsumerForSubscriberMethod(subscriberInfo, "add filter");
        if (null != findEventConsumerForSubscriberMethod) {
            findEventConsumerForSubscriberMethod.addFilters(eventFilter);
            LOGGER.info(String.format("Added a new filter %s for subscriber method %s", eventFilter, subscriberInfo.getSubscriberMethod().toGenericString()));
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void removeFiltersForSubscriber(SubscriberInfo subscriberInfo, EventFilter... eventFilterArr) {
        EventConsumer findEventConsumerForSubscriberMethod = findEventConsumerForSubscriberMethod(subscriberInfo, "remove filter");
        if (null != findEventConsumerForSubscriberMethod) {
            findEventConsumerForSubscriberMethod.removeFilters(eventFilterArr);
            LOGGER.info(String.format("Removed filters %s for subscriber method %s", Arrays.toString(eventFilterArr), subscriberInfo.getSubscriberMethod().toGenericString()));
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void clearFiltersForSubscriber(SubscriberInfo subscriberInfo) {
        EventConsumer findEventConsumerForSubscriberMethod = findEventConsumerForSubscriberMethod(subscriberInfo, "add filter");
        if (null != findEventConsumerForSubscriberMethod) {
            findEventConsumerForSubscriberMethod.clearFilters();
            LOGGER.info(String.format("Removed ALL filters for subscriber method %s", subscriberInfo.getSubscriberMethod().toGenericString()));
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void addFilterForEvent(EventFilter eventFilter, Class<?> cls) {
        if (this.eventTypeVsFilters.put(cls, eventFilter)) {
            LOGGER.info(String.format("Added a new filter %s for the event type: %s", eventFilter, cls));
        } else {
            LOGGER.info(String.format("Filter %s already exists for the event type: %s", eventFilter, cls));
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void removeFiltersForEvent(Class<?> cls, EventFilter... eventFilterArr) {
        if (null == eventFilterArr || eventFilterArr.length == 0) {
            return;
        }
        if (this.eventTypeVsFilters.get((SetMultimap<Class<?>, EventFilter>) cls).removeAll(Arrays.asList(eventFilterArr))) {
            LOGGER.info(String.format("Removed filters %s for event type %s", Arrays.toString(eventFilterArr), cls));
        } else {
            LOGGER.info(String.format("None of the filters %s exists for event type %s. Ignoring remove.", Arrays.toString(eventFilterArr), cls));
        }
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public void clearFiltersForEvent(Class<?> cls) {
        this.eventTypeVsFilters.removeAll((Object) cls);
        LOGGER.info(String.format("Removed ALL filters for event type %s", cls));
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public Set<SubscriberInfo> getAllSubscribers() {
        HashSet hashSet = new HashSet();
        Iterator<CopyOnWriteArrayList<EventConsumer>> it = this.consumersBySubscriberClass.values().iterator();
        while (it.hasNext()) {
            Iterator<EventConsumer> it2 = it.next().iterator();
            while (it2.hasNext()) {
                EventConsumer next = it2.next();
                hashSet.add(new SubscriberInfo(next.getDelegateSubscriber(), next.getContainerInstance()));
            }
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public Set<SubscriberInfo> getAllSubscribersForAnEvent(Class<?> cls) {
        Set<EventConsumer> set = this.consumersByEventType.get((SetMultimap<Class<?>, EventConsumer>) cls);
        if (null == set || set.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet(set.size());
        for (EventConsumer eventConsumer : set) {
            hashSet.add(new SubscriberInfo(eventConsumer.getDelegateSubscriber(), eventConsumer.getContainerInstance()));
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public Set<EventFilter> getFilterForASubscriber(SubscriberInfo subscriberInfo) {
        EventConsumer findEventConsumerForSubscriberMethod = findEventConsumerForSubscriberMethod(subscriberInfo, "get filter");
        return null == findEventConsumerForSubscriberMethod ? Collections.emptySet() : Collections.unmodifiableSet(findEventConsumerForSubscriberMethod.getAttachedFilters());
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public Set<EventFilter> getFiltersForAnEvent(Class<?> cls) {
        return Collections.unmodifiableSet(this.eventTypeVsFilters.get((SetMultimap<Class<?>, EventFilter>) cls));
    }

    @Override // com.netflix.eventbus.spi.EventBus
    public Set<Class<?>> getAllRegisteredEventTypes() {
        return Sets.union(this.consumersByEventType.keySet(), this.eventTypeVsFilters.keySet());
    }

    public synchronized void shutdown() {
        Iterator<EventConsumer> it = this.consumersByEventType.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.consumersByEventType.clear();
        this.consumersBySubscriberClass.clear();
        this.eventTypeVsFilters.clear();
    }

    @VisibleForTesting
    void setConsumerQueueSupplier(ConsumerQueueSupplier consumerQueueSupplier) {
        this.consumerQueueSupplier = consumerQueueSupplier;
    }

    @VisibleForTesting
    Set<EventConsumer> getEventConsumer(Class cls) {
        return this.consumersByEventType.get((SetMultimap<Class<?>, EventConsumer>) cls);
    }

    private boolean applyEventLevelFilters(Object obj) {
        return EventBusUtils.applyFilters(obj, this.eventTypeVsFilters.get((SetMultimap<Class<?>, EventFilter>) obj.getClass()), this.stats.filterStats, " publisher ", LOGGER);
    }

    private List<Method> findSubscriberMethods(Object obj) throws InvalidSubscriberException {
        ArrayList arrayList = new ArrayList();
        HashSet<Method> hashSet = new HashSet();
        hashSet.addAll(Arrays.asList(obj.getClass().getMethods()));
        hashSet.addAll(Arrays.asList(obj.getClass().getDeclaredMethods()));
        for (Method method : hashSet) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                try {
                    method.setAccessible(true);
                    arrayList.add(method);
                } catch (SecurityException e) {
                    LOGGER.error("A subscriber method: " + method.toGenericString() + " is not a public method and the security settings does not allow accessing non-public methods via reflection. This subscriber method will not be registered.", (Throwable) e);
                }
            }
        }
        Map<Method, String> validate = SubscriberValidator.validate(obj, arrayList);
        if (validate.isEmpty()) {
            return arrayList;
        }
        throw new InvalidSubscriberException(obj.getClass(), validate);
    }

    private EventConsumer findEventConsumerForSubscriberMethod(SubscriberInfo subscriberInfo, String str) {
        Method subscriberMethod = subscriberInfo.getSubscriberMethod();
        if (null == ((Subscribe) subscriberMethod.getAnnotation(Subscribe.class))) {
            LOGGER.error(String.format("The subscriber method: %s is not annotated with @Subscribe. Ignoring %s call.", subscriberMethod, str));
            return null;
        }
        CopyOnWriteArrayList<EventConsumer> copyOnWriteArrayList = this.consumersBySubscriberClass.get(subscriberMethod.getDeclaringClass());
        if (null != copyOnWriteArrayList) {
            Iterator<EventConsumer> it = copyOnWriteArrayList.iterator();
            while (it.hasNext()) {
                EventConsumer next = it.next();
                if (isTheSameSubscriber(subscriberInfo, next)) {
                    return next;
                }
            }
        }
        LOGGER.info(String.format("Subscriber: %s is not registered (or already removed). Ignoring %s call.", subscriberMethod, str));
        return null;
    }

    private static Set<Class<?>> getAllTypesForAnEvent(Object obj) {
        try {
            return eventHierarchyCache.get(obj.getClass());
        } catch (ExecutionException e) {
            throw Throwables.propagate(e.getCause());
        }
    }

    private static Set<Class<?>> getAllTypesForAnEventType(Class cls) {
        try {
            return eventHierarchyCache.get(cls);
        } catch (ExecutionException e) {
            throw Throwables.propagate(e.getCause());
        }
    }

    private boolean isTheSameSubscriber(SubscriberInfo subscriberInfo, EventConsumer eventConsumer) {
        return eventConsumer.getDelegateSubscriber().equals(subscriberInfo.getSubscriberMethod()) && eventConsumer.getContainerInstance() == subscriberInfo.getSubscriberInstance();
    }
}
