package org.eclipse.ditto.internal.utils.pekko.mailbox;

import com.typesafe.config.Config;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.pekko.actor.ActorPath;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorRefWithCell;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.dispatch.Envelope;
import org.apache.pekko.dispatch.MailboxType;
import org.apache.pekko.dispatch.MessageQueue;
import org.apache.pekko.dispatch.ProducesMessageQueue;
import org.apache.pekko.dispatch.UnboundedMailbox;
import org.apache.pekko.dispatch.UnboundedMessageQueueSemantics;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pekko/mailbox/MonitoredUnboundedMailboxType.class */
public final class MonitoredUnboundedMailboxType implements MailboxType, ProducesMessageQueue<UnboundedMailbox.MessageQueue> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MonitoredUnboundedMailboxType.class);
    private static final Gauge MAILBOX_SIZE = DittoMetrics.gauge("actor_mailbox_size");
    private static final String CONFIG_OBJECT_PATH = "monitored-unbounded-mailbox";
    private static final String THRESHOLD_FOR_LOGGING_PATH = "threshold-for-logging";
    private static final String LOGGING_INTERVAL_PATH = "logging-interval";
    private static final String ACTORS_INCLUDE_REGEX_PATH = "include-actors-regex";
    private static final String ACTORS_EXCLUDE_REGEX_PATH = "exclude-actors-regex";
    private final Config mailboxConfig;
    private final Pattern includeRegexFilters;
    private final Pattern excludeRegexFilters;

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pekko/mailbox/MonitoredUnboundedMailboxType$InstrumentedMessageQueue.class */
    static final class InstrumentedMessageQueue implements MessageQueue, UnboundedMessageQueueSemantics {
        private final LoggingAdapter log;
        private final ActorPath path;
        private final int threshold;
        private final long interval;
        private final String ownerActorClassName;
        private final Gauge mailboxSizeByActorClassGauge;
        private final Queue<Envelope> queue = new ConcurrentLinkedQueue();
        private final AtomicInteger queueSize = new AtomicInteger();
        private volatile long lastLogTime = System.nanoTime();

        InstrumentedMessageQueue(ActorRef actorRef, ActorSystem actorSystem, int i, long j) {
            this.log = Logging.getLogger(actorSystem, InstrumentedMessageQueue.class);
            this.path = actorRef.path();
            this.threshold = i;
            this.interval = j;
            this.ownerActorClassName = getClassOfOwnerActorRef(actorRef).orElse("unknown-actor-class");
            this.mailboxSizeByActorClassGauge = (Gauge) MonitoredUnboundedMailboxType.MAILBOX_SIZE.tag("actor-class", this.ownerActorClassName);
            this.log.debug("instrumented queue created for actor {} of class {}", this.path, this.ownerActorClassName);
        }

        private static Optional<String> getClassOfOwnerActorRef(ActorRef actorRef) {
            return actorRef instanceof ActorRefWithCell ? Optional.ofNullable(((ActorRefWithCell) actorRef).underlying().props().actorClass().getCanonicalName()) : Optional.empty();
        }

        @Override // org.apache.pekko.dispatch.MessageQueue
        /* renamed from: dequeue */
        public Envelope mo5537dequeue() {
            Envelope poll = this.queue.poll();
            if (poll != null) {
                this.mailboxSizeByActorClassGauge.decrement();
                logMailboxSize(this.queueSize.decrementAndGet());
            }
            return poll;
        }

        @Override // org.apache.pekko.dispatch.MessageQueue
        public void enqueue(ActorRef actorRef, Envelope envelope) {
            this.queue.offer(envelope);
            int incrementAndGet = this.queueSize.incrementAndGet();
            this.mailboxSizeByActorClassGauge.increment();
            logMailboxSize(incrementAndGet);
        }

        private void logMailboxSize(int i) {
            if (i >= this.threshold) {
                long nanoTime = System.nanoTime();
                if (nanoTime - this.lastLogTime > this.interval) {
                    this.lastLogTime = nanoTime;
                    this.log.info("Mailbox size is <{}> for <{}> of class <{}>", Integer.valueOf(i), this.path, this.ownerActorClassName);
                }
            }
        }

        @Override // org.apache.pekko.dispatch.MessageQueue
        public int numberOfMessages() {
            return this.queueSize.get();
        }

        @Override // org.apache.pekko.dispatch.MessageQueue
        public boolean hasMessages() {
            return !this.queue.isEmpty();
        }

        @Override // org.apache.pekko.dispatch.MessageQueue
        public void cleanUp(ActorRef actorRef, MessageQueue messageQueue) {
            Iterator<Envelope> it = this.queue.iterator();
            while (it.hasNext()) {
                messageQueue.enqueue(actorRef, it.next());
                this.mailboxSizeByActorClassGauge.decrement();
                this.queueSize.decrementAndGet();
            }
        }
    }

    public MonitoredUnboundedMailboxType(ActorSystem.Settings settings, Config config) {
        this.mailboxConfig = config.getConfig(CONFIG_OBJECT_PATH);
        this.includeRegexFilters = Pattern.compile(this.mailboxConfig.getString(ACTORS_INCLUDE_REGEX_PATH));
        this.excludeRegexFilters = Pattern.compile(this.mailboxConfig.getString(ACTORS_EXCLUDE_REGEX_PATH));
    }

    @Override // org.apache.pekko.dispatch.MailboxType
    public MessageQueue create(Option<ActorRef> option, Option<ActorSystem> option2) {
        if (option.nonEmpty() && option2.nonEmpty()) {
            ActorRef actorRef = option.get();
            if (shouldTrackActor(actorRef.path())) {
                return new InstrumentedMessageQueue(actorRef, option2.get(), this.mailboxConfig.getInt(THRESHOLD_FOR_LOGGING_PATH), this.mailboxConfig.getLong(LOGGING_INTERVAL_PATH));
            }
        } else {
            LOGGER.warn("Mailbox creation not possible, owner actor <{}> or system <{}> not available", option, option2);
        }
        return new UnboundedMailbox.MessageQueue();
    }

    private boolean shouldTrackActor(ActorPath actorPath) {
        String stringWithoutAddress = actorPath.toStringWithoutAddress();
        if (this.excludeRegexFilters.matcher(stringWithoutAddress).matches()) {
            return false;
        }
        return this.includeRegexFilters.matcher(stringWithoutAddress).matches();
    }
}
