/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.kafka;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.utils.IteratorTemplate;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.RadioMessage;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.buffers.BufferOutOfCapacityException;
import org.graylog2.plugin.buffers.ProcessingDisabledException;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationException;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.NodeId;
import org.joda.time.DateTime;
import org.msgpack.MessagePack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

public class KafkaInput
extends MessageInput {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaInput.class);
    public static final String NAME = "Kafka Input";
    private final MetricRegistry metricRegistry;
    private final NodeId nodeId;
    private final EventBus serverEventBus;
    private final ServerStatus serverStatus;
    private ConsumerConnector cc;
    private volatile boolean stopped = false;
    private volatile boolean paused = true;
    private volatile CountDownLatch pausedLatch = new CountDownLatch(1);
    private AtomicLong totalBytesRead = new AtomicLong(0L);
    private AtomicLong lastSecBytesRead = new AtomicLong(0L);
    private AtomicLong lastSecBytesReadTmp = new AtomicLong(0L);
    private CountDownLatch stopLatch;
    public static final String GROUP_ID = "graylog2";
    public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
    public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
    public static final String CK_ZOOKEEPER = "zookeeper";
    public static final String CK_TOPIC_FILTER = "topic_filter";
    public static final String CK_THREADS = "threads";

    @Inject
    public KafkaInput(MetricRegistry metricRegistry, NodeId nodeId, EventBus serverEventBus, ServerStatus serverStatus) {
        this.metricRegistry = metricRegistry;
        this.nodeId = nodeId;
        this.serverEventBus = serverEventBus;
        this.serverStatus = serverStatus;
    }

    @Subscribe
    public void lifecycleStateChange(Lifecycle lifecycle) {
        LOG.debug("Lifecycle changed to {}", (Object)lifecycle);
        switch (lifecycle) {
            case RUNNING: {
                this.paused = false;
                this.pausedLatch.countDown();
                break;
            }
            default: {
                this.pausedLatch = new CountDownLatch(1);
                this.paused = true;
            }
        }
    }

    @Override
    public void checkConfiguration(Configuration configuration) throws ConfigurationException {
        if (!this.checkConfig(configuration)) {
            throw new ConfigurationException(configuration.getSource().toString());
        }
    }

    @Override
    public void initialize(Configuration configuration) {
        super.initialize(configuration);
        this.setupMetrics();
    }

    @Override
    public void launch(final Buffer processBuffer) throws MisfireException {
        this.serverStatus.awaitRunning(new Runnable(){

            @Override
            public void run() {
                KafkaInput.this.lifecycleStateChange(Lifecycle.RUNNING);
            }
        });
        this.serverEventBus.register(this);
        Properties props = new Properties();
        props.put("group.id", GROUP_ID);
        props.put("client.id", "gl2-" + this.nodeId + "-" + this.getId());
        props.put("fetch.min.bytes", String.valueOf(this.configuration.getInt(CK_FETCH_MIN_BYTES)));
        props.put("fetch.wait.max.ms", String.valueOf(this.configuration.getInt(CK_FETCH_WAIT_MAX)));
        props.put("zookeeper.connect", this.configuration.getString(CK_ZOOKEEPER));
        props.put("auto.commit.interval.ms", "1000");
        int numThreads = (int)this.configuration.getInt(CK_THREADS);
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        this.cc = Consumer.createJavaConsumerConnector(consumerConfig);
        Whitelist filter2 = new Whitelist(this.configuration.getString(CK_TOPIC_FILTER));
        List<KafkaStream<byte[], byte[]>> streams2 = this.cc.createMessageStreamsByFilter(filter2, numThreads);
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        final KafkaInput thisInput = this;
        this.stopLatch = new CountDownLatch(streams2.size());
        for (final KafkaStream<byte[], byte[]> stream : streams2) {
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    MessagePack msgpack = new MessagePack();
                    Iterator consumerIterator = stream.iterator();
                    while (((IteratorTemplate)consumerIterator).hasNext()) {
                        if (KafkaInput.this.paused) {
                            LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
                            Uninterruptibles.awaitUninterruptibly(KafkaInput.this.pausedLatch);
                        }
                        if (KafkaInput.this.stopped) break;
                        Object message = ((ConsumerIterator)consumerIterator).next();
                        Message event = this.decodeMessage(msgpack, (MessageAndMetadata<byte[], byte[]>)message);
                        if (event == null) {
                            return;
                        }
                        boolean retry = false;
                        int retryCount = 0;
                        do {
                            try {
                                if (retry) {
                                    LOG.debug("Waiting 10ms to retry inserting into buffer, retried {} times", (Object)retryCount);
                                    Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
                                }
                                processBuffer.insertFailFast(event, thisInput);
                                retry = false;
                            }
                            catch (BufferOutOfCapacityException e) {
                                LOG.debug("Input buffer full, retrying Kafka message processing");
                                retry = true;
                                ++retryCount;
                            }
                            catch (ProcessingDisabledException e) {
                                LOG.debug("Processing was disabled after we read the message but before we could insert it into the buffer. We cache this one message, and should block on the next iteration.");
                                processBuffer.insertCached(event, thisInput);
                                retry = false;
                            }
                        } while (retry);
                    }
                    KafkaInput.this.cc.commitOffsets();
                    KafkaInput.this.stopLatch.countDown();
                }

                private Message decodeMessage(MessagePack msgpack, MessageAndMetadata<byte[], byte[]> message) {
                    try {
                        byte[] bytes = message.message();
                        KafkaInput.this.totalBytesRead.addAndGet(bytes.length);
                        KafkaInput.this.lastSecBytesReadTmp.addAndGet(bytes.length);
                        RadioMessage msg = msgpack.read(bytes, RadioMessage.class);
                        if (!msg.strings.containsKey("message") || !msg.strings.containsKey("source") || msg.timestamp <= 0L) {
                            LOG.error("Incomplete radio message. Skipping.");
                            return null;
                        }
                        Message event = new Message(msg.strings.get("message"), msg.strings.get("source"), new DateTime(msg.timestamp));
                        event.addStringFields(msg.strings);
                        event.addLongFields(msg.longs);
                        event.addDoubleFields(msg.doubles);
                        return event;
                    }
                    catch (Exception e) {
                        LOG.error("Error while processing Kafka radio message", e);
                        return null;
                    }
                }
            });
        }
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                KafkaInput.this.lastSecBytesRead.set(KafkaInput.this.lastSecBytesReadTmp.getAndSet(0L));
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        this.stopped = true;
        this.serverEventBus.unregister(this);
        if (this.stopLatch != null) {
            try {
                if (this.pausedLatch != null && this.pausedLatch.getCount() > 0L) {
                    this.pausedLatch.countDown();
                }
                boolean allStoppedOrderly = this.stopLatch.await(5L, TimeUnit.SECONDS);
                this.stopLatch = null;
                if (!allStoppedOrderly) {
                    LOG.info("Stopping Kafka input timed out (waited 5 seconds for consumer threads to stop). Forcefully closing connection now. This is usually harmless when stopping the input.");
                }
            }
            catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting to stop input.");
            }
        }
        if (this.cc != null) {
            this.cc.shutdown();
            this.cc = null;
        }
    }

    @Override
    public ConfigurationRequest getRequestedConfiguration() {
        ConfigurationRequest cr = new ConfigurationRequest();
        cr.addField(new TextField(CK_ZOOKEEPER, "ZooKeeper address", "192.168.1.1:2181", "Host and port of the ZooKeeper that is managing your Kafka cluster.", ConfigurationField.Optional.NOT_OPTIONAL));
        cr.addField(new TextField(CK_TOPIC_FILTER, "Topic filter regex", "^your-topic$", "Every topic that matches this regular expression will be consumed.", ConfigurationField.Optional.NOT_OPTIONAL));
        cr.addField(new NumberField(CK_FETCH_MIN_BYTES, "Fetch minimum bytes", 5, "Wait for a message batch to reach at least this size or the configured maximum wait time before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
        cr.addField(new NumberField(CK_FETCH_WAIT_MAX, "Fetch maximum wait time (ms)", 100, "Wait for this time or the configured minimum size of a message batch before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
        cr.addField(new NumberField(CK_THREADS, "Processor threads", 2, "Number of processor threads to spawn. Use one thread per Kafka topic partition.", ConfigurationField.Optional.NOT_OPTIONAL));
        return cr;
    }

    @Override
    public boolean isExclusive() {
        return false;
    }

    @Override
    public String getName() {
        return NAME;
    }

    @Override
    public String linkToDocs() {
        return "http://support.torch.sh/help/kb/getting-your-logs-into-graylog2/using-the-kafka-message-input";
    }

    @Override
    public Map<String, Object> getAttributes() {
        return this.configuration.getSource();
    }

    protected boolean checkConfig(Configuration config) {
        return config.intIsSet(CK_FETCH_MIN_BYTES) && config.intIsSet(CK_FETCH_WAIT_MAX) && config.stringIsSet(CK_ZOOKEEPER) && config.stringIsSet(CK_TOPIC_FILTER) && config.intIsSet(CK_THREADS) && config.getInt(CK_THREADS) > 0L;
    }

    private void setupMetrics() {
        this.metricRegistry.register(MetricRegistry.name(this.getUniqueReadableId(), "read_bytes_1sec"), new Gauge<Long>(){

            @Override
            public Long getValue() {
                return KafkaInput.this.lastSecBytesRead.get();
            }
        });
        this.metricRegistry.register(MetricRegistry.name(this.getUniqueReadableId(), "written_bytes_1sec"), new Gauge<Long>(){

            @Override
            public Long getValue() {
                return 0L;
            }
        });
        this.metricRegistry.register(MetricRegistry.name(this.getUniqueReadableId(), "read_bytes_total"), new Gauge<Long>(){

            @Override
            public Long getValue() {
                return KafkaInput.this.totalBytesRead.get();
            }
        });
        this.metricRegistry.register(MetricRegistry.name(this.getUniqueReadableId(), "written_bytes_total"), new Gauge<Long>(){

            @Override
            public Long getValue() {
                return 0L;
            }
        });
    }
}

