/*
 * Decompiled with CFR 0.152.
 */
package io.appform.dropwizard.actors.base;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.appform.dropwizard.actors.actor.ActorConfig;
import io.appform.dropwizard.actors.actor.MessageHandlingFunction;
import io.appform.dropwizard.actors.base.utils.NamingUtils;
import io.appform.dropwizard.actors.connectivity.RMQConnection;
import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
import io.appform.dropwizard.actors.exceptionhandler.handlers.ExceptionHandler;
import io.appform.dropwizard.actors.retry.RetryStrategy;
import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnmanagedConsumer<Message> {
    private static final Logger log = LoggerFactory.getLogger(UnmanagedConsumer.class);
    private final String name;
    private final ActorConfig config;
    private final RMQConnection connection;
    private final ObjectMapper mapper;
    private final Class<? extends Message> clazz;
    private final int prefetchCount;
    private final MessageHandlingFunction<Message, Boolean> handlerFunction;
    private final Function<Throwable, Boolean> errorCheckFunction;
    private final String queueName;
    private final RetryStrategy retryStrategy;
    private final ExceptionHandler exceptionHandler;
    private List<Handler> handlers = Lists.newArrayList();

    public UnmanagedConsumer(String name, ActorConfig config, RMQConnection connection, ObjectMapper mapper, RetryStrategyFactory retryStrategyFactory, ExceptionHandlingFactory exceptionHandlingFactory, Class<? extends Message> clazz, MessageHandlingFunction<Message, Boolean> handlerFunction, Function<Throwable, Boolean> errorCheckFunction) {
        this.name = name;
        this.config = config;
        this.connection = connection;
        this.mapper = mapper;
        this.clazz = clazz;
        this.prefetchCount = config.getPrefetchCount();
        this.handlerFunction = handlerFunction;
        this.errorCheckFunction = errorCheckFunction;
        this.queueName = NamingUtils.queueName(config.getPrefix(), name);
        this.retryStrategy = retryStrategyFactory.create(config.getRetryConfig());
        this.exceptionHandler = exceptionHandlingFactory.create(config.getExceptionHandlerConfig());
    }

    private boolean handle(Message message) throws Exception {
        return this.handlerFunction.apply(message);
    }

    public void start() throws Exception {
        for (int i = 1; i <= this.config.getConcurrency(); ++i) {
            Channel consumeChannel = this.connection.newChannel();
            Handler handler = new Handler(consumeChannel, this.mapper, this.clazz, this.prefetchCount);
            String tag = consumeChannel.basicConsume(this.queueName, false, (Consumer)handler);
            handler.setTag(tag);
            this.handlers.add(handler);
            log.info("Started consumer {} of type {}", (Object)i, (Object)this.name);
        }
    }

    public void stop() throws Exception {
        this.handlers.forEach(handler -> {
            try {
                Channel channel = handler.getChannel();
                channel.basicCancel(handler.getTag());
                channel.close();
            }
            catch (Exception e) {
                log.error(String.format("Error cancelling consumer: %s", handler.getTag()), (Throwable)e);
            }
        });
    }

    private class Handler
    extends DefaultConsumer {
        private final ObjectMapper mapper;
        private final Class<? extends Message> clazz;
        private String tag;

        private Handler(Channel channel, ObjectMapper mapper, Class<? extends Message> clazz, int prefetchCount) throws Exception {
            super(channel);
            this.mapper = mapper;
            this.clazz = clazz;
            this.getChannel().basicQos(prefetchCount);
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            try {
                Object message = this.mapper.readValue(body, this.clazz);
                boolean success = UnmanagedConsumer.this.retryStrategy.execute(() -> UnmanagedConsumer.this.handle(message));
                if (success) {
                    this.getChannel().basicAck(envelope.getDeliveryTag(), false);
                } else {
                    this.getChannel().basicReject(envelope.getDeliveryTag(), false);
                }
            }
            catch (Throwable t) {
                log.error("Error processing message...", t);
                if (((Boolean)UnmanagedConsumer.this.errorCheckFunction.apply(t)).booleanValue()) {
                    log.warn("Acked message due to exception: ", t);
                    this.getChannel().basicAck(envelope.getDeliveryTag(), false);
                }
                if (UnmanagedConsumer.this.exceptionHandler.handle()) {
                    log.warn("Acked message due to exception handling strategy: ", t);
                    this.getChannel().basicAck(envelope.getDeliveryTag(), false);
                }
                this.getChannel().basicReject(envelope.getDeliveryTag(), false);
            }
        }

        public String getTag() {
            return this.tag;
        }

        public void setTag(String tag) {
            this.tag = tag;
        }
    }
}

