package com.tridion.cache.remoting;

import com.tridion.cache.CacheEvent;
import com.tridion.util.LoggingThread;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/udp-cache-11.5.0-1047.jar:com/tridion/cache/remoting/EventWorker.class */
public class EventWorker implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) EventWorker.class);
    private volatile boolean running = false;
    private static final int DEFAULT_WAIT_TIME = 1000;
    private BlockingQueue<CacheEventItem> eventQueue;
    private ListenerRegistry listenerRegistry;

    public EventWorker(BlockingQueue<CacheEventItem> blockingQueue, ListenerRegistry listenerRegistry) {
        this.eventQueue = blockingQueue;
        this.listenerRegistry = listenerRegistry;
    }

    public void startEventSending() {
        if (this.running) {
            return;
        }
        new LoggingThread(this).start();
        this.running = true;
    }

    private void broadcastEvent(CacheEventItem cacheEventItem) {
        List<RemoteListener> currentListeners = this.listenerRegistry.getCurrentListeners();
        CacheEvent event = cacheEventItem.getEvent();
        String guid = cacheEventItem.getGuid();
        LOG.debug("Notifying all listeners for key: " + event.getKey() + " from source: " + guid);
        currentListeners.stream().filter(remoteListener -> {
            return !remoteListener.getGuid().equals(guid);
        }).forEach(remoteListener2 -> {
            try {
                remoteListener2.getListener().handleEvent(event);
            } catch (Throwable th) {
                LOG.error("Could not send message " + event + " to listener (" + remoteListener2.getGuid() + ") " + remoteListener2.getListener(), th);
                this.listenerRegistry.removeListener(remoteListener2);
            }
        });
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            while (true) {
                try {
                    CacheEventItem poll = this.eventQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        long currentTimeMillis = System.currentTimeMillis();
                        broadcastEvent(poll);
                        LOG.debug("Finishing notification of all listeners for key: " + poll.getEvent().getKey() + " in " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.running = false;
                    Thread.currentThread().interrupt();
                    LOG.error("EventQueue polling was interrupted, stopping sender thread", (Throwable) e);
                }
            }
        }
    }
}
