package com.googlecode.jmxtrans.model.output.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import com.googlecode.jmxtrans.model.Query;
import com.googlecode.jmxtrans.model.Result;
import com.googlecode.jmxtrans.model.Server;
import com.googlecode.jmxtrans.model.ValidationException;
import com.googlecode.jmxtrans.model.naming.KeyUtils;
import com.googlecode.jmxtrans.model.output.BaseOutputWriter;
import com.googlecode.jmxtrans.model.output.Settings;
import com.googlecode.jmxtrans.util.NumberUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.concurrent.NotThreadSafe;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:com/googlecode/jmxtrans/model/output/kafka/KafkaWriter.class */
public class KafkaWriter extends BaseOutputWriter {
    private static final Logger log = LoggerFactory.getLogger(KafkaWriter.class);
    private static final String DEFAULT_ROOT_PREFIX = "servers";
    private final JsonFactory jsonFactory;
    private Producer<String, String> producer;
    private final Iterable<String> topics;
    private final String rootPrefix;
    private final ImmutableMap<String, String> tags;

    @JsonCreator
    public KafkaWriter(@JsonProperty("typeNames") ImmutableList<String> immutableList, @JsonProperty("booleanAsNumber") boolean z, @JsonProperty("rootPrefix") String str, @JsonProperty("debug") Boolean bool, @JsonProperty("topics") String str2, @JsonProperty("tags") Map<String, String> map, @JsonProperty("settings") Map<String, Object> map2) {
        super(immutableList, z, bool, map2);
        this.rootPrefix = (String) firstNonNull(str, (String) getSettings().get("rootPrefix"), DEFAULT_ROOT_PREFIX);
        Properties properties = new Properties();
        properties.setProperty("metadata.broker.list", Settings.getStringSetting(map2, "metadata.broker.list", (String) null));
        properties.setProperty("zk.connect", Settings.getStringSetting(map2, "zk.connect", (String) null));
        properties.setProperty("serializer.class", Settings.getStringSetting(map2, "serializer.class", (String) null));
        this.producer = new Producer<>(new ProducerConfig(properties));
        this.topics = Arrays.asList(Settings.getStringSetting(map2, "topics", "").split(","));
        this.tags = ImmutableMap.copyOf((Map) firstNonNull(map, (Map) getSettings().get("tags"), ImmutableMap.of()));
        this.jsonFactory = new JsonFactory();
    }

    public void validateSetup(Server server, Query query) throws ValidationException {
    }

    protected void internalWrite(Server server, Query query, ImmutableList<Result> immutableList) throws Exception {
        ImmutableList typeNames = getTypeNames();
        UnmodifiableIterator it = immutableList.iterator();
        while (it.hasNext()) {
            Result result = (Result) it.next();
            log.debug("Query result: [{}]", result);
            for (Map.Entry<String, Object> entry : result.getValues().entrySet()) {
                Object value = entry.getValue();
                if (NumberUtils.isNumeric(value)) {
                    String createJsonMessage = createJsonMessage(server, query, typeNames, result, entry, value);
                    for (String str : this.topics) {
                        log.debug("Topic: [{}] ; Kafka Message: [{}]", str, createJsonMessage);
                        this.producer.send(new KeyedMessage(str, createJsonMessage));
                    }
                } else {
                    log.warn("Unable to submit non-numeric value to Kafka: [{}] from result [{}]", value, result);
                }
            }
        }
    }

    private String createJsonMessage(Server server, Query query, List<String> list, Result result, Map.Entry<String, Object> entry, Object obj) throws IOException {
        String replaceAll = KeyUtils.getKeyString(server, query, result, entry, list, this.rootPrefix).replaceAll("[()]", "_");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            JsonGenerator createGenerator = this.jsonFactory.createGenerator(byteArrayOutputStream, JsonEncoding.UTF8);
            Throwable th2 = null;
            try {
                createGenerator.writeStartObject();
                createGenerator.writeStringField("keyspace", replaceAll);
                createGenerator.writeStringField("value", obj.toString());
                createGenerator.writeNumberField("timestamp", result.getEpoch() / 1000);
                createGenerator.writeObjectFieldStart("tags");
                UnmodifiableIterator it = this.tags.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry2 = (Map.Entry) it.next();
                    createGenerator.writeStringField((String) entry2.getKey(), (String) entry2.getValue());
                }
                createGenerator.writeEndObject();
                createGenerator.writeEndObject();
                createGenerator.close();
                String byteArrayOutputStream2 = byteArrayOutputStream.toString("UTF-8");
                if (createGenerator != null) {
                    if (0 != 0) {
                        try {
                            createGenerator.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createGenerator.close();
                    }
                }
                return byteArrayOutputStream2;
            } catch (Throwable th4) {
                if (createGenerator != null) {
                    if (0 != 0) {
                        try {
                            createGenerator.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createGenerator.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    @VisibleForTesting
    void setProducer(Producer<String, String> producer) {
        this.producer = producer;
    }
}
