/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.radio;

import com.codahale.metrics.MetricRegistry;
import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import java.util.Map;
import org.graylog2.inputs.kafka.KafkaInput;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationException;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RadioKafkaInput
extends KafkaInput {
    private static final Logger LOG = LoggerFactory.getLogger(RadioKafkaInput.class);
    public static final String NAME = "Graylog2 Radio Input (Kafka)";

    @Inject
    public RadioKafkaInput(MetricRegistry metricRegistry, NodeId nodeId, EventBus eventBus, ServerStatus serverStatus) {
        super(metricRegistry, nodeId, eventBus, serverStatus);
    }

    @Override
    public void checkConfiguration(Configuration configuration) throws ConfigurationException {
        configuration.setString("topic_filter", "^graylog2-radio-messages$");
        if (!this.checkConfig(configuration)) {
            throw new ConfigurationException(configuration.getSource().toString());
        }
    }

    @Override
    public String getName() {
        return NAME;
    }

    @Override
    public String linkToDocs() {
        return "http://support.torch.sh/help/kb/graylog2-server/using-graylog2-radio-v020x";
    }

    @Override
    public ConfigurationRequest getRequestedConfiguration() {
        ConfigurationRequest cr = new ConfigurationRequest();
        cr.addField(new TextField("zookeeper", "ZooKeeper address", "192.168.1.1:2181", "Host and port of the ZooKeeper that is managing your Kafka cluster.", ConfigurationField.Optional.NOT_OPTIONAL));
        cr.addField(new NumberField("fetch_min_bytes", "Fetch minimum bytes", 5, "Wait for a message batch to reach at least this size or the configured maximum wait time before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
        cr.addField(new NumberField("fetch_wait_max", "Fetch maximum wait time (ms)", 100, "Wait for this time or the configured minimum size of a message batch before fetching.", ConfigurationField.Optional.NOT_OPTIONAL));
        cr.addField(new NumberField("threads", "Processor threads", 2, "Number of processor threads to spawn. Use one thread per Kafka topic partition.", ConfigurationField.Optional.NOT_OPTIONAL));
        return cr;
    }

    @Override
    public Map<String, Object> getAttributes() {
        return this.configuration.getSource();
    }

    @Override
    protected boolean checkConfig(Configuration config) {
        return config.intIsSet("fetch_min_bytes") && config.intIsSet("fetch_wait_max") && config.stringIsSet("zookeeper") && config.intIsSet("threads") && config.getInt("threads") > 0L;
    }
}

