package org.meridor.perspective.framework.messaging.impl;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.meridor.perspective.framework.messaging.Dispatcher;
import org.meridor.perspective.framework.messaging.Message;
import org.meridor.perspective.framework.storage.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:WEB-INF/lib/perspective-framework-1.2.0-RC2.jar:org/meridor/perspective/framework/messaging/impl/BaseConsumer.class */
public abstract class BaseConsumer implements ApplicationListener<ContextRefreshedEvent> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BaseConsumer.class);

    @Value("${perspective.messaging.shutdown.timeout}")
    private int shutdownTimeout;

    @Value("${perspective.messaging.polling.delay:1000}")
    private int pollingDelay;

    @Value("${perspective.messaging.tolerable.queue.size:1000}")
    private int tolerableQueueSize;

    @Autowired
    private Storage storage;

    @Autowired
    private Dispatcher dispatcher;
    private ExecutorService executorService;
    private volatile boolean canExecute = true;

    protected abstract String getStorageKey();

    protected abstract int getParallelConsumers();

    private Runnable getRunnable() {
        return () -> {
            while (this.canExecute) {
                String storageKey = getStorageKey();
                try {
                } catch (Exception e) {
                    LOG.error(String.format("Failed to consume message from queue %s", storageKey), (Throwable) e);
                }
                if (!this.storage.isAvailable()) {
                    LOG.debug("Stopping consuming from queue = {} as storage is not available", storageKey);
                    return;
                }
                BlockingQueue queue = this.storage.getQueue(storageKey);
                int size = queue.size();
                Object poll = queue.poll(this.pollingDelay, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    if (poll instanceof Message) {
                        if (size > this.tolerableQueueSize) {
                            LOG.warn("Messages queue {} size = {} exceeds tolerable size = {}. This can be a signal to increase total number of workers.", storageKey, Integer.valueOf(size), Integer.valueOf(this.tolerableQueueSize));
                        }
                        Message message = (Message) poll;
                        LOG.trace("Consumed message = {} from queue = {}", message.getId(), storageKey);
                        this.dispatcher.dispatch(message);
                    } else {
                        LOG.warn("Skipping {} as it is not a message", poll);
                    }
                }
            }
        };
    }

    @PreDestroy
    public void destroy() throws InterruptedException {
        this.canExecute = false;
        if (this.executorService != null) {
            LOG.debug("Shutting down consuming from queue {}", getStorageKey());
            this.executorService.shutdown();
            this.executorService.awaitTermination(this.shutdownTimeout, TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        int parallelConsumers = getParallelConsumers();
        LOG.debug("Will use {} parallel consumers for queue {}", Integer.valueOf(parallelConsumers), getStorageKey());
        this.executorService = Executors.newFixedThreadPool(parallelConsumers);
        for (int i = 0; i <= parallelConsumers - 1; i++) {
            this.executorService.submit(getRunnable());
        }
    }
}
