package org.onosproject.event.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.event.AbstractEvent;
import org.onosproject.event.DefaultEventSinkRegistry;
import org.onosproject.event.Event;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.EventSink;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.impl.FlowRuleDriverProvider;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/event/impl/CoreEventDispatcher.class */
public class CoreEventDispatcher extends DefaultEventSinkRegistry implements EventDeliveryService {
    private static final long DEFAULT_EXECUTE_MS = 5000;
    private static final long WATCHDOG_MS = 250;
    private static final Event KILL_PILL = new AbstractEvent(null, 0) { // from class: org.onosproject.event.impl.CoreEventDispatcher.1
    };
    private final Logger log = LoggerFactory.getLogger(getClass());
    private DispatchLoop topologyDispatcher = new DispatchLoop("topology");
    private DispatchLoop programmingDispatcher = new DispatchLoop("programming");
    private DispatchLoop defaultDispatcher = new DispatchLoop(FlowRuleDriverProvider.SCHEME);
    private Map<Class, DispatchLoop> dispatcherMap = new ImmutableMap.Builder().put(TopologyEvent.class, this.topologyDispatcher).put(DeviceEvent.class, this.topologyDispatcher).put(LinkEvent.class, this.topologyDispatcher).put(HostEvent.class, this.topologyDispatcher).put(FlowRuleEvent.class, this.programmingDispatcher).put(IntentEvent.class, this.programmingDispatcher).build();
    private Set<DispatchLoop> dispatchers = new ImmutableSet.Builder().addAll(this.dispatcherMap.values()).add(this.defaultDispatcher).build();
    private long maxProcessMillis = DEFAULT_EXECUTE_MS;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/event/impl/CoreEventDispatcher$DispatchLoop.class */
    public class DispatchLoop implements Runnable {
        private final String name;
        private volatile boolean stopped;
        private volatile EventSink lastSink;
        private TimerTask watchdog;
        private volatile Future<?> dispatchFuture;
        private final ExecutorService executor;
        private final Stopwatch stopwatch = Stopwatch.createUnstarted();
        private final BlockingQueue<Event> eventsQueue = new LinkedBlockingQueue();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/onosproject/event/impl/CoreEventDispatcher$DispatchLoop$Watchdog.class */
        public class Watchdog extends TimerTask {
            private Watchdog() {
            }

            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                long elapsed = DispatchLoop.this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
                if (elapsed > CoreEventDispatcher.this.maxProcessMillis) {
                    DispatchLoop.this.stopwatch.reset();
                    CoreEventDispatcher.this.log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop", DispatchLoop.this.lastSink.getClass().getName(), Long.valueOf(elapsed));
                    DispatchLoop.this.lastSink.onProcessLimit();
                    DispatchLoop.this.stop();
                    DispatchLoop.this.dispatchFuture.cancel(true);
                    DispatchLoop.this.dispatchFuture = DispatchLoop.this.executor.submit(this);
                }
            }
        }

        DispatchLoop(String str) {
            this.name = str;
            this.executor = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/event", "dispatch-" + str + "%d", CoreEventDispatcher.this.log));
            this.dispatchFuture = this.executor.submit(this);
        }

        public boolean add(Event event) {
            return this.eventsQueue.add(event);
        }

        @Override // java.lang.Runnable
        public void run() {
            Event take;
            this.stopped = false;
            CoreEventDispatcher.this.log.info("Dispatch loop initiated");
            while (!this.stopped) {
                try {
                    try {
                        take = this.eventsQueue.take();
                    } catch (Error | Exception e) {
                        CoreEventDispatcher.this.log.warn("Error encountered while dispatching event:", e);
                    }
                } catch (InterruptedException e2) {
                    CoreEventDispatcher.this.log.warn("Dispatch loop interrupted");
                }
                if (take == CoreEventDispatcher.KILL_PILL) {
                    break;
                } else {
                    process(take);
                }
            }
            CoreEventDispatcher.this.log.info("Dispatch loop terminated");
        }

        private void process(Event event) {
            EventSink sink = CoreEventDispatcher.this.getSink(event.getClass());
            if (sink == null) {
                CoreEventDispatcher.this.log.warn("No sink registered for event class {}", event.getClass().getName());
                return;
            }
            this.lastSink = sink;
            this.stopwatch.start();
            sink.process(event);
            this.stopwatch.reset();
        }

        void stop() {
            this.stopped = true;
            stopWatchdog();
            add(CoreEventDispatcher.KILL_PILL);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startWatchdog() {
            CoreEventDispatcher.this.log.info("Starting watchdog task for dispatcher {}", this.name);
            this.watchdog = new Watchdog();
            SharedExecutors.getTimer().schedule(this.watchdog, CoreEventDispatcher.WATCHDOG_MS, CoreEventDispatcher.WATCHDOG_MS);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stopWatchdog() {
            CoreEventDispatcher.this.log.info("Stopping watchdog task for dispatcher {}", this.name);
            if (this.watchdog != null) {
                this.watchdog.cancel();
            }
        }
    }

    private DispatchLoop getDispatcher(Event event) {
        DispatchLoop dispatchLoop = this.dispatcherMap.get(event.getClass());
        if (dispatchLoop == null) {
            dispatchLoop = this.defaultDispatcher;
        }
        return dispatchLoop;
    }

    public void post(Event event) {
        if (getDispatcher(event).add(event)) {
            return;
        }
        this.log.error("Unable to post event {}", event);
    }

    @Activate
    public void activate() {
        if (this.maxProcessMillis != 0) {
            this.dispatchers.forEach(obj -> {
                ((DispatchLoop) obj).startWatchdog();
            });
        }
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.dispatchers.forEach((v0) -> {
            v0.stop();
        });
        this.log.info("Stopped");
    }

    public void setDispatchTimeLimit(long j) {
        AppGuard.checkPermission(AppPermission.Type.EVENT_WRITE);
        Preconditions.checkArgument(j == 0 || j >= WATCHDOG_MS, "Time limit must be greater than %s", new Object[]{Long.valueOf(WATCHDOG_MS)});
        long j2 = this.maxProcessMillis;
        this.maxProcessMillis = j;
        if (j == 0 && j2 != 0) {
            this.dispatchers.forEach(obj -> {
                ((DispatchLoop) obj).stopWatchdog();
            });
        } else {
            if (j == 0 || j2 != 0) {
                return;
            }
            this.dispatchers.forEach(obj2 -> {
                ((DispatchLoop) obj2).startWatchdog();
            });
        }
    }

    public long getDispatchTimeLimit() {
        AppGuard.checkPermission(AppPermission.Type.EVENT_READ);
        return this.maxProcessMillis;
    }
}
