/*
 * Decompiled with CFR 0.152.
 */
package io.appform.dropwizard.actors.base;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.rabbitmq.client.Channel;
import io.appform.dropwizard.actors.actor.ActorConfig;
import io.appform.dropwizard.actors.actor.MessageHandlingFunction;
import io.appform.dropwizard.actors.base.Handler;
import io.appform.dropwizard.actors.base.utils.NamingUtils;
import io.appform.dropwizard.actors.connectivity.RMQConnection;
import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
import io.appform.dropwizard.actors.exceptionhandler.handlers.ExceptionHandler;
import io.appform.dropwizard.actors.retry.RetryStrategy;
import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnmanagedConsumer<Message> {
    private static final Logger log = LoggerFactory.getLogger(UnmanagedConsumer.class);
    private final String name;
    private final ActorConfig config;
    private final RMQConnection connection;
    private final ObjectMapper mapper;
    private final Class<? extends Message> clazz;
    private final int prefetchCount;
    private final MessageHandlingFunction<Message, Boolean> handlerFunction;
    private final Function<Throwable, Boolean> errorCheckFunction;
    private final String queueName;
    private final RetryStrategy retryStrategy;
    private final ExceptionHandler exceptionHandler;
    private final List<Handler<Message>> handlers = Lists.newArrayList();

    public UnmanagedConsumer(String name, ActorConfig config, RMQConnection connection, ObjectMapper mapper, RetryStrategyFactory retryStrategyFactory, ExceptionHandlingFactory exceptionHandlingFactory, Class<? extends Message> clazz, MessageHandlingFunction<Message, Boolean> handlerFunction, Function<Throwable, Boolean> errorCheckFunction) {
        this.name = NamingUtils.prefixWithNamespace(name);
        this.config = config;
        this.connection = connection;
        this.mapper = mapper;
        this.clazz = clazz;
        this.prefetchCount = config.getPrefetchCount();
        this.handlerFunction = handlerFunction;
        this.errorCheckFunction = errorCheckFunction;
        this.queueName = NamingUtils.queueName(config.getPrefix(), name);
        this.retryStrategy = retryStrategyFactory.create(config.getRetryConfig());
        this.exceptionHandler = exceptionHandlingFactory.create(config.getExceptionHandlerConfig());
    }

    public void start() throws Exception {
        for (int i = 1; i <= this.config.getConcurrency(); ++i) {
            Channel consumeChannel = this.connection.newChannel();
            Handler<Message> handler = new Handler<Message>(consumeChannel, this.mapper, this.clazz, this.prefetchCount, this.errorCheckFunction, this.retryStrategy, this.exceptionHandler, this.handlerFunction);
            String queueNameForConsumption = this.config.isSharded() ? NamingUtils.getShardedQueueName(this.queueName, i % this.config.getShardCount()) : this.queueName;
            String tag = consumeChannel.basicConsume(queueNameForConsumption, false, handler);
            handler.setTag(tag);
            this.handlers.add(handler);
            log.info("Started consumer {} of type {}", (Object)i, (Object)this.name);
        }
    }

    public void stop() {
        this.handlers.forEach(handler -> {
            try {
                Channel channel = handler.getChannel();
                channel.basicCancel(handler.getTag());
                channel.close();
                log.info("Consumer channel closed for [{}] with prefix [{}]", (Object)this.name, (Object)this.config.getPrefix());
            }
            catch (Exception e) {
                log.error(String.format("Error closing consumer channel [%s] for [%s] with prefix [%s]", handler.getTag(), this.name, this.config.getPrefix()), (Throwable)e);
            }
        });
    }
}

