/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.EventDrainer;
import io.mantisrx.publish.EventProcessor;
import io.mantisrx.publish.EventTransmitter;
import io.mantisrx.publish.StreamManager;
import io.mantisrx.publish.SubscriptionTracker;
import io.mantisrx.publish.Tee;
import io.mantisrx.publish.api.EventPublisher;
import io.mantisrx.publish.config.MrePublishConfiguration;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MrePublishClientInitializer {
    private static final Logger LOG = LoggerFactory.getLogger(MrePublishClientInitializer.class);
    private final MrePublishConfiguration config;
    private final Registry registry;
    private final StreamManager streamManager;
    private final EventPublisher eventPublisher;
    private final SubscriptionTracker subscriptionsTracker;
    private final EventTransmitter eventTransmitter;
    private final Tee tee;
    private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList();
    private static final ScheduledThreadPoolExecutor DRAINER_EXECUTOR = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "MantisDrainer"));
    private static final ScheduledThreadPoolExecutor SUBSCRIPTIONS_EXECUTOR = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "MantisSubscriptionsTracker"));

    public MrePublishClientInitializer(MrePublishConfiguration config, Registry registry, StreamManager streamManager, EventPublisher eventPublisher, SubscriptionTracker subscriptionsTracker, EventTransmitter eventTransmitter, Tee tee) {
        this.config = config;
        this.registry = registry;
        this.streamManager = streamManager;
        this.eventPublisher = eventPublisher;
        this.subscriptionsTracker = subscriptionsTracker;
        this.eventTransmitter = eventTransmitter;
        this.tee = tee;
    }

    public void start() {
        this.scheduledFutures.add(this.setupSubscriptionTracker(this.subscriptionsTracker));
        this.scheduledFutures.add(this.setupDrainer(this.streamManager, this.eventTransmitter, this.tee));
    }

    public void stop() {
        for (ScheduledFuture<?> next2 : this.scheduledFutures) {
            if (next2 == null || next2.isCancelled()) continue;
            next2.cancel(false);
        }
        this.scheduledFutures.clear();
    }

    public EventPublisher getEventPublisher() {
        return this.eventPublisher;
    }

    private ScheduledFuture<?> setupDrainer(StreamManager streamManager, EventTransmitter transmitter, Tee tee) {
        EventProcessor eventProcessor = new EventProcessor(this.config, streamManager, tee);
        EventDrainer eventDrainer = new EventDrainer(this.config, streamManager, this.registry, eventProcessor, transmitter, Clock.systemUTC());
        return DRAINER_EXECUTOR.scheduleAtFixedRate(eventDrainer::run, 0L, this.config.drainerIntervalMsec(), TimeUnit.MILLISECONDS);
    }

    private ScheduledFuture<?> setupSubscriptionTracker(SubscriptionTracker subscriptionsTracker) {
        return SUBSCRIPTIONS_EXECUTOR.scheduleAtFixedRate(() -> {
            block2: {
                try {
                    subscriptionsTracker.refreshSubscriptions();
                }
                catch (Exception e2) {
                    if (!LOG.isDebugEnabled()) break block2;
                    LOG.debug("failed to refresh scheduledFutures", e2);
                }
            }
        }, 1L, this.config.subscriptionRefreshIntervalSec(), TimeUnit.SECONDS);
    }

    static {
        DRAINER_EXECUTOR.setRemoveOnCancelPolicy(true);
        SUBSCRIPTIONS_EXECUTOR.setRemoveOnCancelPolicy(true);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            DRAINER_EXECUTOR.shutdown();
            SUBSCRIPTIONS_EXECUTOR.shutdown();
        }));
    }
}

