/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.amqp;

import com.google.common.util.concurrent.Uninterruptibles;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.RadioMessage;
import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.buffers.BufferOutOfCapacityException;
import org.graylog2.plugin.buffers.ProcessingDisabledException;
import org.graylog2.plugin.inputs.MessageInput;
import org.joda.time.DateTime;
import org.msgpack.MessagePack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
    private final String hostname;
    private final int port;
    private final String virtualHost;
    private final String username;
    private final String password;
    private final int prefetchCount;
    private final String queue;
    private final String exchange;
    private final String routingKey;
    private Connection connection;
    private Channel channel;
    private final Buffer processBuffer;
    private final MessageInput sourceInput;
    private AtomicLong totalBytesRead = new AtomicLong(0L);
    private AtomicLong lastSecBytesRead = new AtomicLong(0L);
    private AtomicLong lastSecBytesReadTmp = new AtomicLong(0L);

    public Consumer(String hostname, int port, String virtualHost, String username, String password, int prefetchCount, String queue, String exchange, String routingKey, Buffer processBuffer, MessageInput sourceInput) {
        this.hostname = hostname;
        this.port = port;
        this.virtualHost = virtualHost;
        this.username = username;
        this.password = password;
        this.prefetchCount = prefetchCount;
        this.queue = queue;
        this.exchange = exchange;
        this.routingKey = routingKey;
        this.processBuffer = processBuffer;
        this.sourceInput = sourceInput;
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                Consumer.this.lastSecBytesRead.set(Consumer.this.lastSecBytesReadTmp.getAndSet(0L));
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    public void run() throws IOException {
        if (!this.isConnected()) {
            this.connect();
        }
        final MessagePack msgpack = new MessagePack();
        this.channel.basicConsume(this.queue, false, new DefaultConsumer(this.channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                block7: {
                    long deliveryTag = envelope.getDeliveryTag();
                    try {
                        Consumer.this.totalBytesRead.addAndGet(body.length);
                        Consumer.this.lastSecBytesReadTmp.addAndGet(body.length);
                        RadioMessage msg = msgpack.read(body, RadioMessage.class);
                        if (!msg.strings.containsKey("message") || !msg.strings.containsKey("source") || msg.timestamp <= 0L) {
                            LOG.error("Incomplete AMQP message. Skipping.");
                            Consumer.this.channel.basicAck(deliveryTag, false);
                        }
                        Message event = new Message(msg.strings.get("message"), msg.strings.get("source"), new DateTime(msg.timestamp));
                        event.addStringFields(msg.strings);
                        event.addLongFields(msg.longs);
                        event.addDoubleFields(msg.doubles);
                        Consumer.this.processBuffer.insertFailFast(event, Consumer.this.sourceInput);
                        Consumer.this.channel.basicAck(deliveryTag, false);
                    }
                    catch (BufferOutOfCapacityException e) {
                        LOG.debug("Input buffer full, requeuing message. Delaying 10 ms until trying next message.");
                        if (Consumer.this.channel.isOpen()) {
                            Consumer.this.channel.basicNack(deliveryTag, false, true);
                            Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
                        }
                    }
                    catch (ProcessingDisabledException e) {
                        LOG.debug("Message processing is disabled, requeuing message. Delaying 100 ms until trying next message.");
                        if (Consumer.this.channel.isOpen()) {
                            Consumer.this.channel.basicNack(deliveryTag, false, true);
                            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
                        }
                    }
                    catch (Exception e) {
                        LOG.error("Error while trying to process AMQP message, requeuing message", e);
                        if (!Consumer.this.channel.isOpen()) break block7;
                        Consumer.this.channel.basicNack(deliveryTag, false, true);
                    }
                }
            }
        });
    }

    public void connect() throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(this.hostname);
        factory.setPort(this.port);
        factory.setVirtualHost(this.virtualHost);
        if (this.username != null && !this.username.isEmpty() && this.password != null && !this.password.isEmpty()) {
            factory.setUsername(this.username);
            factory.setPassword(this.password);
        }
        this.connection = factory.newConnection();
        this.channel = this.connection.createChannel();
        if (this.prefetchCount > 0) {
            this.channel.basicQos(this.prefetchCount);
            LOG.info("AMQP prefetch count overriden to <{}>.", (Object)this.prefetchCount);
        }
        this.connection.addShutdownListener(new ShutdownListener(){

            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
                if (cause.isInitiatedByApplication()) {
                    LOG.info("Not reconnecting connection, we disconnected explicitely.");
                    return;
                }
                while (true) {
                    try {
                        LOG.error("AMQP connection lost! Trying reconnect in 1 second.");
                        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                        Consumer.this.connect();
                        LOG.info("Connected! Re-starting consumer.");
                        Consumer.this.run();
                        LOG.info("Consumer running.");
                    }
                    catch (IOException e) {
                        LOG.error("Could not re-connect to AMQP broker.", e);
                        continue;
                    }
                    break;
                }
            }
        });
    }

    public void stop() throws IOException {
        if (this.channel != null && this.channel.isOpen()) {
            this.channel.close();
        }
        if (this.connection != null && this.connection.isOpen()) {
            this.connection.close();
        }
    }

    public boolean isConnected() {
        return this.connection != null && this.connection.isOpen() && this.channel != null && this.channel.isOpen();
    }

    public AtomicLong getLastSecBytesRead() {
        return this.lastSecBytesRead;
    }

    public AtomicLong getTotalBytesRead() {
        return this.totalBytesRead;
    }
}

