package io.journalkeeper.utils.event;

import io.journalkeeper.utils.spi.ServiceSupport;
import io.journalkeeper.utils.threads.AsyncLoopThread;
import io.journalkeeper.utils.threads.ThreadBuilder;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/journalkeeper/utils/event/EventBus.class */
public class EventBus implements Watchable {
    private static final Logger logger = LoggerFactory.getLogger(EventBus.class);
    private final NavigableMap<Long, Event> cachedEvents;
    private final AtomicLong watchIdGenerator;
    private final AtomicLong nextSequence;
    private final Set<EventWatcher> eventWatchers;
    private final Map<Long, PullEventWatcher> pullEventWatchers;
    private final long pullEventIntervalMs;
    private final long pullEventWatcherTimeout;
    private final AsyncLoopThread removeTimeoutPullWatchersThread;
    private final Collection<EventInterceptor> interceptors;

    /* loaded from: input_file:io/journalkeeper/utils/event/EventBus$PullEventWatcher.class */
    private static class PullEventWatcher {
        private final AtomicLong sequence = new AtomicLong(0);
        private long lastPullTimestamp = System.currentTimeMillis();

        PullEventWatcher(long j) {
            this.sequence.set(j);
        }

        void touch() {
            this.lastPullTimestamp = System.currentTimeMillis();
        }
    }

    public EventBus(long j) {
        this.cachedEvents = new ConcurrentSkipListMap();
        this.watchIdGenerator = new AtomicLong(0L);
        this.nextSequence = new AtomicLong(0L);
        this.eventWatchers = ConcurrentHashMap.newKeySet();
        this.pullEventWatchers = new ConcurrentHashMap();
        this.pullEventIntervalMs = j;
        this.pullEventWatcherTimeout = 5 * j;
        this.interceptors = ServiceSupport.loadAll(EventInterceptor.class);
        this.removeTimeoutPullWatchersThread = buildRemoveTimeoutPullWatchersThread();
        this.removeTimeoutPullWatchersThread.start();
    }

    public EventBus() {
        this(1000L);
    }

    private AsyncLoopThread buildRemoveTimeoutPullWatchersThread() {
        return ThreadBuilder.builder().name("RemoveTimeoutPullWatchersThread").doWork(this::removeTimeoutPullWatchers).sleepTime(this.pullEventWatcherTimeout, this.pullEventWatcherTimeout).onException(th -> {
            logger.warn("RemoveTimeoutPullWatchersThread Exception: ", th);
        }).daemon(true).build();
    }

    private void removeTimeoutPullWatchers() {
        this.pullEventWatchers.entrySet().removeIf(entry -> {
            return ((PullEventWatcher) entry.getValue()).lastPullTimestamp + (this.pullEventWatcherTimeout * 3) < System.currentTimeMillis();
        });
    }

    public synchronized void fireEvent(Event event) {
        Iterator<EventInterceptor> it = this.interceptors.iterator();
        while (it.hasNext()) {
            if (!it.next().onEvent(event, this)) {
                logger.info("Event canceled by an interceptor, type: {}, data: {}", Integer.valueOf(event.getEventType()), event.getEventData());
                return;
            }
        }
        this.eventWatchers.forEach(eventWatcher -> {
            eventWatcher.onEvent(event);
        });
        if (this.pullEventWatchers.isEmpty()) {
            return;
        }
        this.cachedEvents.put(Long.valueOf(this.nextSequence.getAndIncrement()), event);
    }

    @Override // io.journalkeeper.utils.event.Watchable
    public void watch(EventWatcher eventWatcher) {
        if (eventWatcher != null) {
            this.eventWatchers.add(eventWatcher);
        }
    }

    @Override // io.journalkeeper.utils.event.Watchable
    public void unWatch(EventWatcher eventWatcher) {
        if (eventWatcher != null) {
            this.eventWatchers.remove(eventWatcher);
        }
    }

    public long addPullWatch() {
        long andIncrement = this.watchIdGenerator.getAndIncrement();
        this.pullEventWatchers.put(Long.valueOf(andIncrement), new PullEventWatcher(this.nextSequence.get()));
        return andIncrement;
    }

    public void removePullWatch(long j) {
        this.pullEventWatchers.remove(Long.valueOf(j));
    }

    public long pullIntervalMs() {
        return this.pullEventIntervalMs;
    }

    public List<PullEvent> pullEvents(long j) {
        PullEventWatcher pullEventWatcher = this.pullEventWatchers.get(Long.valueOf(j));
        if (null == pullEventWatcher) {
            return null;
        }
        List<PullEvent> list = (List) this.cachedEvents.tailMap(Long.valueOf(pullEventWatcher.sequence.get())).entrySet().stream().map(entry -> {
            return new PullEvent(((Event) entry.getValue()).getEventType(), ((Long) entry.getKey()).longValue(), ((Event) entry.getValue()).getEventData());
        }).collect(Collectors.toList());
        pullEventWatcher.touch();
        return list;
    }

    public void ackPullEvents(long j, long j2) {
        PullEventWatcher pullEventWatcher = this.pullEventWatchers.get(Long.valueOf(j));
        if (null == pullEventWatcher || pullEventWatcher.sequence.get() <= j2) {
            return;
        }
        pullEventWatcher.sequence.set(j2);
    }

    public void shutdown() {
        this.removeTimeoutPullWatchersThread.stop();
    }

    public boolean hasEventWatchers() {
        return !this.eventWatchers.isEmpty();
    }
}
