/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.metis.core.execution;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import eu.europeana.metis.core.execution.PersistenceProvider;
import eu.europeana.metis.core.execution.WorkflowExecutionMonitor;
import eu.europeana.metis.core.execution.WorkflowExecutionSettings;
import eu.europeana.metis.core.execution.WorkflowExecutor;
import eu.europeana.metis.core.workflow.WorkflowExecution;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueConsumer
extends DefaultConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueConsumer.class);
    private final WorkflowExecutionSettings workflowExecutionSettings;
    private final PersistenceProvider persistenceProvider;
    private final WorkflowExecutionMonitor workflowExecutionMonitor;
    private final ExecutorService threadPool;
    private final ExecutorCompletionService<WorkflowExecution> completionService;
    private int threadsCounter;

    public QueueConsumer(Channel rabbitmqConsumerChannel, String rabbitmqQueueName, WorkflowExecutionSettings workflowExecutionSettings, PersistenceProvider persistenceProvider, WorkflowExecutionMonitor workflowExecutionMonitor) throws IOException {
        super(persistenceProvider.getRabbitmqConsumerChannel());
        this.workflowExecutionSettings = workflowExecutionSettings;
        this.persistenceProvider = persistenceProvider;
        this.threadPool = Executors.newFixedThreadPool(this.workflowExecutionSettings.getMaxConcurrentThreads());
        this.completionService = new ExecutorCompletionService(this.threadPool);
        this.workflowExecutionMonitor = workflowExecutionMonitor;
        rabbitmqConsumerChannel.basicQos(1);
        rabbitmqConsumerChannel.basicConsume(rabbitmqQueueName, false, (Consumer)this);
    }

    public void handleDelivery(String consumerTag, Envelope rabbitmqEnvelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String objectId = new String(body, StandardCharsets.UTF_8);
        LOGGER.info("WorkflowExecution id: {} received from queue.", (Object)objectId);
        if (this.threadsCounter >= this.workflowExecutionSettings.getMaxConcurrentThreads()) {
            LOGGER.debug("Trying to clean thread pool, found thread pool full with threadsCounter: {}, maxConcurrentThreads: {}", (Object)this.threadsCounter, (Object)this.workflowExecutionSettings.getMaxConcurrentThreads());
            this.checkAndCleanCompletionService();
        }
        if (this.threadsCounter >= this.workflowExecutionSettings.getMaxConcurrentThreads()) {
            super.getChannel().basicNack(rabbitmqEnvelope.getDeliveryTag(), false, true);
            LOGGER.debug("NACK sent for {} with tag {}", (Object)objectId, (Object)rabbitmqEnvelope.getDeliveryTag());
        } else {
            WorkflowExecution workflowExecution = this.persistenceProvider.getWorkflowExecutionDao().getById(objectId);
            if (workflowExecution == null) {
                LOGGER.warn("Workflow execution with id: {} is in queue but no longer exists.", (Object)objectId.toString());
            } else if (workflowExecution.isCancelling()) {
                workflowExecution.setWorkflowAndAllQualifiedPluginsToCancelled();
                this.persistenceProvider.getWorkflowExecutionDao().update(workflowExecution);
                LOGGER.info("Cancelled inqueue user workflow execution with id: {}", (Object)workflowExecution.getId());
            } else {
                WorkflowExecutor workflowExecutor = new WorkflowExecutor(objectId, this.persistenceProvider, this.workflowExecutionSettings, this.workflowExecutionMonitor);
                this.completionService.submit(workflowExecutor);
                ++this.threadsCounter;
            }
            super.getChannel().basicAck(rabbitmqEnvelope.getDeliveryTag(), false);
            LOGGER.debug("ACK sent for {} with tag {}", (Object)workflowExecution.getId(), (Object)rabbitmqEnvelope.getDeliveryTag());
        }
    }

    private void checkAndCleanCompletionService() throws IOException {
        try {
            Future<WorkflowExecution> userWorkflowExecutionFuture = this.completionService.poll(this.workflowExecutionSettings.getPollingTimeoutForCleaningCompletionServiceInSecs(), TimeUnit.SECONDS);
            if (userWorkflowExecutionFuture != null) {
                --this.threadsCounter;
            }
            while (this.completionService.poll() != null) {
                --this.threadsCounter;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.error("Interrupted while polling for taking a Future from the ExecutorCompletionService", (Throwable)e);
            throw new IOException("Interrupted while polling for taking a Future from the ExecutorCompletionService", e);
        }
    }

    @PreDestroy
    void close() {
        this.threadPool.shutdown();
    }

    int getThreadsCounter() {
        return this.threadsCounter;
    }
}

