/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.radio.transports.amqp;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.radio.Configuration;
import org.graylog2.radio.transports.RadioTransport;
import org.graylog2.radio.transports.amqp.AMQPSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPProducer
implements RadioTransport {
    private final ServerStatus serverStatus;
    private static final Logger LOG = LoggerFactory.getLogger(AMQPProducer.class);
    private final AMQPSenderPool senderPool;
    private final Meter incomingMessages;
    private final Meter rejectedMessages;
    private final Timer processTime;

    @Inject
    public AMQPProducer(MetricRegistry metricRegistry, Configuration configuration, ServerStatus serverStatus) {
        this.serverStatus = serverStatus;
        this.senderPool = new AMQPSenderPool(configuration.getAmqpParallelQueues(), configuration);
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(AMQPProducer.class, (String[])new String[]{"incomingMessages"}));
        this.rejectedMessages = metricRegistry.meter(MetricRegistry.name(AMQPProducer.class, (String[])new String[]{"rejectedMessages"}));
        this.processTime = metricRegistry.timer(MetricRegistry.name(AMQPProducer.class, (String[])new String[]{"processTime"}));
    }

    @Override
    public void send(Message msg) throws IOException {
        try (Timer.Context context = this.processTime.time();){
            this.incomingMessages.mark();
            this.senderPool.send(msg);
        }
        catch (IOException e) {
            LOG.error("Could not write to AMQP.", (Throwable)e);
            this.rejectedMessages.mark();
            throw e;
        }
    }

    private class AMQPSenderPool {
        private final int count;
        private final AMQPSender[] senders;
        private final AtomicInteger pointer;

        private AMQPSenderPool(int count, Configuration configuration) {
            this.count = count;
            this.senders = new AMQPSender[count];
            for (int i = 0; i < count; ++i) {
                this.senders[i] = new AMQPSender(configuration.getAmqpHostname(), configuration.getAmqpPort(), String.format(configuration.getAmqpVirtualHost(), i), configuration.getAmqpUsername(), configuration.getAmqpPassword(), String.format(configuration.getAmqpQueueName(), i), configuration.getAmqpQueueType(), String.format(configuration.getAmqpExchangeName(), i), String.format(configuration.getAmqpRoutingKey(), i), configuration.isAmqpPersistentMessagesEnabled());
            }
            this.pointer = new AtomicInteger(0);
        }

        public void send(Message msg) throws IOException {
            int currentIndex = this.pointer.getAndIncrement();
            this.senders[Math.abs(currentIndex % this.count)].send(msg);
        }
    }
}

