/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import io.mantisrx.publish.EventProcessor;
import io.mantisrx.publish.EventTransmitter;
import io.mantisrx.publish.StreamManager;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

class EventDrainer
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(EventDrainer.class);
    public static final String LOGGING_CONTEXT_KEY = "mantisLogCtx";
    static final String DEFAULT_THREAD_NAME = "mantisDrainer";
    public static final String LOGGING_CONTEXT_VALUE = "mantisDrainer";
    private final MrePublishConfiguration config;
    private final Timer mantisEventDrainTimer;
    private final StreamManager streamManager;
    private final EventProcessor eventProcessor;
    private final EventTransmitter eventTransmitter;
    private final Clock clock;

    EventDrainer(MrePublishConfiguration config, StreamManager streamManager, Registry registry, EventProcessor eventProcessor, EventTransmitter eventTransmitter, Clock clock) {
        this.config = config;
        this.mantisEventDrainTimer = SpectatorUtils.buildAndRegisterTimer(registry, "mrePublishEventDrainTime");
        this.streamManager = streamManager;
        this.eventProcessor = eventProcessor;
        this.eventTransmitter = eventTransmitter;
        this.clock = clock;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Starting drainer thread.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            MDC.put((String)LOGGING_CONTEXT_KEY, (String)"mantisDrainer");
            long startTime = this.clock.millis();
            Set<String> streams = this.streamManager.getRegisteredStreams();
            for (String stream : streams) {
                ArrayList streamEventList = new ArrayList();
                try {
                    Optional<BlockingQueue<Event>> streamQueueO = this.streamManager.getQueueForStream(stream);
                    if (!streamQueueO.isPresent()) continue;
                    BlockingQueue<Event> streamEventQ = streamQueueO.get();
                    streamEventQ.drainTo(streamEventList);
                    int queueDepth = streamEventList.size();
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Queue drained size: {} for stream {}", (Object)queueDepth, (Object)stream);
                    }
                    this.streamManager.getStreamMetrics(stream).ifPresent(m -> {
                        m.getMantisEventsQueuedGauge().set((double)queueDepth);
                        if (queueDepth > 0) {
                            m.updateLastEventOnStreamTimestamp();
                        }
                    });
                    streamEventList.stream().map(e -> this.process(stream, (Event)e)).filter(Objects::nonNull).forEach(e -> this.eventTransmitter.send((Event)e, stream));
                    streamEventList.clear();
                }
                catch (Exception e2) {
                    LOG.warn("Exception processing events for stream {}", (Object)stream, (Object)e2);
                }
            }
            long processingTime = this.clock.millis() - startTime;
            this.mantisEventDrainTimer.record(processingTime, TimeUnit.MILLISECONDS);
        }
        finally {
            MDC.remove((String)LOGGING_CONTEXT_KEY);
        }
    }

    private Event process(String stream, Event event) {
        long startTime = this.clock.millis();
        Event processedEvent = this.eventProcessor.process(stream, event);
        long processingTime = this.clock.millis() - startTime;
        this.streamManager.getStreamMetrics(stream).ifPresent(m -> m.getMantisEventsProcessTimeTimer().record(processingTime, TimeUnit.MILLISECONDS));
        return processedEvent;
    }
}

