package org.elasticsoftware.elasticactors.broadcast;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import org.elasticsoftware.elasticactors.Actor;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorSystem;
import org.elasticsoftware.elasticactors.MessageHandler;
import org.elasticsoftware.elasticactors.MessageHandlers;
import org.elasticsoftware.elasticactors.MethodActor;
import org.elasticsoftware.elasticactors.base.serialization.JacksonSerializationFramework;
import org.elasticsoftware.elasticactors.broadcast.handlers.RehashHandlers;
import org.elasticsoftware.elasticactors.broadcast.messages.Add;
import org.elasticsoftware.elasticactors.broadcast.messages.LeafNodesRequest;
import org.elasticsoftware.elasticactors.broadcast.messages.LeafNodesResponse;
import org.elasticsoftware.elasticactors.broadcast.messages.Remove;
import org.elasticsoftware.elasticactors.broadcast.messages.Throttled;
import org.elasticsoftware.elasticactors.broadcast.messages.ThrottledMessage;
import org.elasticsoftware.elasticactors.broadcast.messages.UpdateThrottleConfig;
import org.elasticsoftware.elasticactors.broadcast.state.BroadcasterState;
import org.elasticsoftware.elasticactors.broadcast.state.ThrottleConfig;
import org.elasticsoftware.elasticactors.broadcast.state.ThrottledBroadcastSession;
import org.elasticsoftware.elasticactors.state.ActorLifecycleStep;
import org.elasticsoftware.elasticactors.state.PersistenceConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.aspectj.AbstractDependencyInjectionAspect;
import org.springframework.beans.factory.aspectj.AnnotationBeanConfigurerAspect;
import org.springframework.beans.factory.aspectj.ConfigurableObject;
import org.springframework.core.env.Environment;

@PersistenceConfig(persistOnMessages = false, included = {Add.class, Remove.class, UpdateThrottleConfig.class}, persistOn = {ActorLifecycleStep.CREATE})
@Configurable
@MessageHandlers({RehashHandlers.class})
@Actor(stateClass = BroadcasterState.class, serializationFramework = JacksonSerializationFramework.class)
/* loaded from: input_file:org/elasticsoftware/elasticactors/broadcast/Broadcaster.class */
public final class Broadcaster extends MethodActor implements ConfigurableObject {
    private static final Pattern EXPRESSION_PATTERN;
    private JacksonSerializationFramework serializationFramework;
    private Environment environment;
    private final Map<Class<?>, ThrottleConfig> throttleConfigCache;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;

    static {
        ajc$preClinit();
        EXPRESSION_PATTERN = Pattern.compile("^\\$\\{([^:]+)(?::([^:]+))?}$");
    }

    public Broadcaster() {
        JoinPoint makeJP = Factory.makeJP(ajc$tjp_1, this, this);
        JoinPoint makeJP2 = Factory.makeJP(ajc$tjp_0, this, this);
        if (this != null && getClass().isAnnotationPresent(Configurable.class) && AnnotationBeanConfigurerAspect.ajc$if$bb0(getClass().getAnnotation(Configurable.class))) {
            AnnotationBeanConfigurerAspect.aspectOf().ajc$before$org_springframework_beans_factory_aspectj_AbstractDependencyInjectionAspect$1$e854fa65(this);
        }
        if ((this == null || !getClass().isAnnotationPresent(Configurable.class) || !AnnotationBeanConfigurerAspect.ajc$if$bb0(getClass().getAnnotation(Configurable.class))) && this != null && getClass().isAnnotationPresent(Configurable.class) && AbstractDependencyInjectionAspect.ajc$if$6f1(makeJP2)) {
            AnnotationBeanConfigurerAspect.aspectOf().ajc$afterReturning$org_springframework_beans_factory_aspectj_AbstractDependencyInjectionAspect$2$1ea6722c(this);
        }
        this.throttleConfigCache = new ConcurrentHashMap();
        if (AnnotationBeanConfigurerAspect.ajc$if$bb0(getClass().getAnnotation(Configurable.class)) || !AbstractDependencyInjectionAspect.ajc$if$6f1(makeJP)) {
            return;
        }
        AnnotationBeanConfigurerAspect.aspectOf().ajc$afterReturning$org_springframework_beans_factory_aspectj_AbstractDependencyInjectionAspect$2$1ea6722c(this);
    }

    public void postCreate(ActorRef actorRef) throws Exception {
        BroadcasterState broadcasterState = (BroadcasterState) getState(BroadcasterState.class);
        if (broadcasterState.getLeaves().size() > broadcasterState.getBucketSize()) {
            rehash(broadcasterState);
        }
    }

    public void postActivate(String str) throws Exception {
    }

    public void preDestroy(ActorRef actorRef) throws Exception {
        super.preDestroy(actorRef);
        BroadcasterState state = getState(BroadcasterState.class);
        if (state.isLeafNode()) {
            return;
        }
        Iterator<ActorRef> it = state.getNodes().iterator();
        while (it.hasNext()) {
            getSystem().stop(it.next());
        }
    }

    private ThrottleConfig getThrottleConfig(Object obj) {
        return this.throttleConfigCache.computeIfAbsent(obj.getClass(), this::resolveThrottleConfig);
    }

    private ThrottleConfig resolveThrottleConfig(Class<?> cls) {
        ThrottleConfig throttleConfig = new ThrottleConfig(Integer.valueOf(getMaxMessagesPerSecond(cls)));
        this.logger.debug("Resolved broadcast throttling config {} for class {}", throttleConfig, cls.getName());
        return throttleConfig;
    }

    private int getMaxMessagesPerSecond(Class<?> cls) {
        Throttled annotation = cls.getAnnotation(Throttled.class);
        if (annotation == null) {
            return 0;
        }
        try {
            Matcher matcher = EXPRESSION_PATTERN.matcher(annotation.maxPerSecond());
            if (!matcher.matches()) {
                return Integer.parseInt(annotation.maxPerSecond());
            }
            try {
                return ((Integer) this.environment.getRequiredProperty(matcher.group(1), Integer.class)).intValue();
            } catch (IllegalStateException e) {
                String group = matcher.group(2);
                if (group != null) {
                    return Integer.parseInt(group);
                }
                throw e;
            }
        } catch (Exception e2) {
            this.logger.error("Could not parse throttling configuration for message class {}", cls.getName(), e2);
            return 0;
        }
    }

    @Autowired
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    @Autowired
    public void setSerializationFramework(JacksonSerializationFramework jacksonSerializationFramework) {
        this.serializationFramework = jacksonSerializationFramework;
    }

    @MessageHandler
    public void handleRemove(Remove remove, BroadcasterState broadcasterState) {
        if (broadcasterState.getCurrentlyRehashing().booleanValue()) {
            if (!broadcasterState.getRehashRoot().booleanValue()) {
                this.logger.error("Received remove request, but broadcaster [{}] is currently rehashing and is not the root!. This should not happen, ignoring remove request.", getSelf().getActorId());
                return;
            } else {
                this.logger.info("Received remove request, but broadcaster [{}] is currently rehashing. Saving message for when rehashing will be done", getSelf().getActorId());
                broadcasterState.getReceivedDuringRehashing().add(remove);
                return;
            }
        }
        if (broadcasterState.isLeafNode()) {
            broadcasterState.getLeaves().removeAll(remove.getMembers());
            return;
        }
        Multimap<ActorRef, ActorRef> mapToBucket = mapToBucket(remove.getMembers(), broadcasterState);
        for (ActorRef actorRef : mapToBucket.keys()) {
            actorRef.tell(new Remove((Collection<ActorRef>) mapToBucket.get(actorRef)), getSelf());
        }
        broadcasterState.decrementSize(remove.getMembers().size());
    }

    @MessageHandler
    public void handleAdd(Add add, BroadcasterState broadcasterState) throws Exception {
        if (broadcasterState.getCurrentlyRehashing().booleanValue()) {
            if (!broadcasterState.getRehashRoot().booleanValue()) {
                this.logger.error("Received add request, but broadcaster [{}] is currently rehashing and is not the root!. This should not happen, ignoring add request.", getSelf().getActorId());
                return;
            } else {
                this.logger.info("Received add request, but broadcaster [{}] is currently rehashing. Saving message for when rehashing will be done", getSelf().getActorId());
                broadcasterState.getReceivedDuringRehashing().add(add);
                return;
            }
        }
        if (broadcasterState.isLeafNode()) {
            broadcasterState.getLeaves().addAll(add.getMembers());
            if (broadcasterState.getLeaves().size() > broadcasterState.getBucketSize()) {
                rehash(broadcasterState);
                return;
            }
            return;
        }
        Multimap<ActorRef, ActorRef> mapToBucket = mapToBucket(add.getMembers(), broadcasterState);
        for (ActorRef actorRef : mapToBucket.asMap().keySet()) {
            actorRef.tell(new Add((Collection<ActorRef>) mapToBucket.get(actorRef)), getSelf());
        }
        broadcasterState.incrementSize(add.getMembers().size());
    }

    @MessageHandler
    public void handleUpdateThrottleConfig(UpdateThrottleConfig updateThrottleConfig, BroadcasterState broadcasterState, ActorRef actorRef) {
        this.logger.warn("Received an attempt to update the throttle config from actor [{}]. This has been deprecated and should not be used anymore.", actorRef);
        broadcasterState.setThrottleConfig(updateThrottleConfig.getThrottleConfig());
    }

    @MessageHandler
    public void handleLeafNodesRequest(LeafNodesRequest leafNodesRequest, BroadcasterState broadcasterState, ActorRef actorRef) {
        ActorRef self = getSelf();
        if (broadcasterState.isLeafNode()) {
            this.logger.debug("Node[{}]: broadcast [{}] reached leaf, sending myself to the parent [{}]", new Object[]{self.getActorId(), leafNodesRequest.getBroadcastId(), actorRef.getActorId()});
            actorRef.tell(new LeafNodesResponse(leafNodesRequest.getBroadcastId(), Sets.newHashSet(new ActorRef[]{self})));
            return;
        }
        this.logger.debug("Node [{}]: broadcast [{}] reached node, forwarding the request to children nodes", self.getActorId(), leafNodesRequest.getBroadcastId());
        broadcasterState.addThrottledBroadcastSession(new ThrottledBroadcastSession(leafNodesRequest.getBroadcastId(), actorRef));
        for (ActorRef actorRef2 : broadcasterState.getNodes()) {
            this.logger.trace("Node [{}]: forwarding broadcast [{}] to children node [{}]", new Object[]{self.getActorId(), leafNodesRequest.getBroadcastId(), actorRef2.getActorId()});
            actorRef2.tell(leafNodesRequest, self);
        }
    }

    @MessageHandler
    public void handleLeafNodesResponse(LeafNodesResponse leafNodesResponse, BroadcasterState broadcasterState, ActorSystem actorSystem, ActorRef actorRef) {
        ThrottledBroadcastSession throttledBroadcastSession = broadcasterState.getThrottledBroadcastSession(leafNodesResponse.getBroadcastId());
        ActorRef self = getSelf();
        if (throttledBroadcastSession == null) {
            this.logger.warn("Node [{}]: got response from child node [{}], but couldn't find session [{}]", new Object[]{self.getActorId(), actorRef, leafNodesResponse.getBroadcastId()});
            return;
        }
        throttledBroadcastSession.handleLeafNodesResponse(leafNodesResponse);
        if (!throttledBroadcastSession.isReady(broadcasterState.getNodes().size())) {
            this.logger.debug("Node [{}]: session [{}] is not ready yet. Number of nodes = {}, received = {}", new Object[]{self.getActorId(), throttledBroadcastSession.getId(), Integer.valueOf(broadcasterState.getNodes().size()), Integer.valueOf(throttledBroadcastSession.getReceivedResponses())});
            return;
        }
        if (throttledBroadcastSession.getParent() != null) {
            this.logger.debug("Node [{}]: sending response of session [{}] to the parent [{}] with {} leaf nodes", new Object[]{self.getActorId(), throttledBroadcastSession.getId(), throttledBroadcastSession.getParent().getActorId(), Integer.valueOf(throttledBroadcastSession.getLeafNodes().size())});
            throttledBroadcastSession.getParent().tell(new LeafNodesResponse(throttledBroadcastSession.getId(), throttledBroadcastSession.getLeafNodes()));
        } else {
            this.logger.debug("Node [{}]: got throttling action for session [{}]", self.getActorId(), throttledBroadcastSession.getId());
            throttle(throttledBroadcastSession, broadcasterState, actorSystem);
        }
        this.logger.debug("Node [{}]: removing the broadcast session [{}]", self.getActorId(), throttledBroadcastSession.getId());
        broadcasterState.removeThrottledBroadcastSession(throttledBroadcastSession.getId());
    }

    @MessageHandler
    public void handleThrottledMessage(ThrottledMessage throttledMessage) {
        try {
            this.logger.debug("Node [{}]: handling ThrottledMessage of class [{}] received from [{}]", new Object[]{getSelf().getActorId(), throttledMessage.getMessageClass(), throttledMessage.getSender()});
            onUnhandled(throttledMessage.getSender(), this.serializationFramework.getObjectMapper().readValue(throttledMessage.getMessageData(), Class.forName(throttledMessage.getMessageClass())));
        } catch (Exception e) {
            this.logger.error("Unexpected Exception scheduling throttled message of type [{}] from sender [{}]", new Object[]{throttledMessage.getMessageClass(), throttledMessage.getSender(), e});
        }
    }

    private void throttle(ThrottledBroadcastSession throttledBroadcastSession, BroadcasterState broadcasterState, ActorSystem actorSystem) {
        long intValue = (long) ((1000.0d / throttledBroadcastSession.getThrottleConfig().getMaxMessagesPerSecond().intValue()) * broadcasterState.getBucketSize());
        try {
            ThrottledMessage throttledMessage = new ThrottledMessage(throttledBroadcastSession.getSender(), throttledBroadcastSession.getMessage().getClass().getName(), this.serializationFramework.getObjectMapper().writeValueAsString(throttledBroadcastSession.getMessage()));
            long j = 0;
            ActorRef self = getSelf();
            for (ActorRef actorRef : throttledBroadcastSession.getLeafNodes()) {
                long j2 = j * intValue;
                this.logger.trace("Node [{}]: scheduling throttled message of type [{}] to leaf node [{}] in {} ms", new Object[]{self.getActorId(), throttledBroadcastSession.getMessage().getClass().getName(), actorRef.getActorId(), Long.valueOf(j2)});
                actorSystem.getScheduler().scheduleOnce(throttledMessage, actorRef, j2, TimeUnit.MILLISECONDS);
                j++;
            }
        } catch (Exception e) {
            this.logger.error("Unexpected Exception scheduling throttled message of type [{}] from sender [{}]", new Object[]{throttledBroadcastSession.getMessage().getClass().getName(), throttledBroadcastSession.getSender(), e});
        }
    }

    protected void onUnhandled(ActorRef actorRef, Object obj) {
        BroadcasterState state = getState(BroadcasterState.class);
        ActorRef self = getSelf();
        if (state.isLeafNode()) {
            this.logger.debug("Node [{}]: leaf got message of type [{}]", self.getActorId(), obj.getClass().getName());
            for (ActorRef actorRef2 : state.getLeaves()) {
                this.logger.trace("Node [{}]: sending message of type [{}] to [{}]", new Object[]{self.getActorId(), obj.getClass().getName(), actorRef2});
                actorRef2.tell(obj, actorRef);
            }
            return;
        }
        ThrottleConfig throttleConfig = getThrottleConfig(obj);
        if (!throttleConfig.isValid()) {
            this.logger.debug("Node [{}]: broadcasting message of type [{}]", self.getActorId(), obj.getClass().getName());
            for (ActorRef actorRef3 : state.getNodes()) {
                this.logger.trace("Node [{}]: sending message of type [{}] to [{}]", new Object[]{self.getActorId(), obj.getClass().getName(), actorRef3.getActorId()});
                actorRef3.tell(obj, actorRef);
            }
            return;
        }
        ThrottledBroadcastSession throttledBroadcastSession = new ThrottledBroadcastSession(obj, actorRef, throttleConfig);
        state.addThrottledBroadcastSession(throttledBroadcastSession);
        this.logger.debug("Node [{}]: initiating throttled broadcast [{}] with {} messages/sec for message of type [{}]", new Object[]{self.getActorId(), throttledBroadcastSession.getId(), throttleConfig.getMaxMessagesPerSecond(), obj.getClass().getName()});
        LeafNodesRequest leafNodesRequest = new LeafNodesRequest(throttledBroadcastSession.getId());
        for (ActorRef actorRef4 : state.getNodes()) {
            this.logger.trace("Node [{}]: sending leaf node request for broadcast [{}] to [{}]", new Object[]{self.getActorId(), leafNodesRequest.getBroadcastId(), actorRef4.getActorId()});
            actorRef4.tell(leafNodesRequest, getSelf());
        }
    }

    private Multimap<ActorRef, ActorRef> mapToBucket(Set<ActorRef> set, BroadcasterState broadcasterState) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (ActorRef actorRef : set) {
            create.put(broadcasterState.getNodes().get(Math.abs(Hashing.murmur3_32().hashString(String.format("%s:%s", getSelf().getActorId(), actorRef), StandardCharsets.UTF_8).asInt()) % broadcasterState.getBucketsPerNode()), actorRef);
        }
        return create;
    }

    private Multimap<String, ActorRef> mapToBucket(Set<ActorRef> set, List<String> list) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (ActorRef actorRef : set) {
            create.put(list.get(Math.abs(Hashing.murmur3_32().hashString(String.format("%s:%s", getSelf().getActorId(), actorRef), StandardCharsets.UTF_8).asInt()) % list.size()), actorRef);
        }
        return create;
    }

    private void rehash(BroadcasterState broadcasterState) throws Exception {
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < broadcasterState.getBucketsPerNode(); i++) {
            linkedList.add(String.format("%s/%d", getSelf().getActorId(), Integer.valueOf(i)));
        }
        Multimap<String, ActorRef> mapToBucket = mapToBucket(broadcasterState.getLeaves(), linkedList);
        ActorSystem system = getSystem();
        for (String str : linkedList) {
            broadcasterState.getNodes().add(system.actorOf(str, Broadcaster.class, new BroadcasterState(broadcasterState.getBucketsPerNode(), broadcasterState.getBucketSize(), mapToBucket.get(str))));
        }
        broadcasterState.incrementSize(broadcasterState.getLeaves().size());
        broadcasterState.getLeaves().clear();
        broadcasterState.setLeafNode(false);
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("Broadcaster.java", Broadcaster.class);
        ajc$tjp_0 = factory.makeSJP("initialization", factory.makeConstructorSig("1", "org.springframework.beans.factory.aspectj.ConfigurableObject", "", "", ""), 51);
        ajc$tjp_1 = factory.makeSJP("initialization", factory.makeConstructorSig("1", "org.elasticsoftware.elasticactors.broadcast.Broadcaster", "", "", ""), 51);
    }
}
