package io.smartcat.cassandra.diagnostics.reporter;

import com.aphyr.riemann.Proto;
import com.aphyr.riemann.client.EventDSL;
import com.aphyr.riemann.client.IRiemannClient;
import com.aphyr.riemann.client.RiemannClient;
import io.smartcat.cassandra.diagnostics.Measurement;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smartcat/cassandra/diagnostics/reporter/RiemannReporter.class */
public class RiemannReporter extends Reporter {
    private static final String HOST_PROP = "riemannHost";
    private static final String PORT_PROP = "riemannPort";
    private static final String DEFAULT_PORT = "5555";
    private static final Logger logger = LoggerFactory.getLogger(RiemannReporter.class);
    private static IRiemannClient riemann;

    public RiemannReporter(ReporterConfiguration reporterConfiguration) {
        super(reporterConfiguration);
    }

    public void report(Measurement measurement) {
        if (riemannClient() == null) {
            logger.warn("Cannot report riemann event without initialized client.");
            return;
        }
        logger.debug("Sending Measurement: time={}", Long.valueOf(measurement.time()));
        try {
            sendEvent(measurement);
        } catch (Exception e) {
            logger.debug("Sending Query failed, trying one more time: execTime={}, exception: {}", Long.valueOf(measurement.time()), e.getMessage());
            retry(measurement);
        }
    }

    private void retry(Measurement measurement) {
        try {
            sendEvent(measurement);
        } catch (IOException e) {
            logger.debug("Sending Query failed, ignoring message: execTime={}, exception: {}", Long.valueOf(measurement.time()), e.getMessage());
        }
    }

    private synchronized Proto.Msg sendEvent(Measurement measurement) throws IOException {
        EventDSL event = riemann.event();
        event.service(measurement.name());
        event.state("ok");
        event.metric(measurement.value());
        event.ttl(30.0f);
        Iterator it = measurement.tags().entrySet().iterator();
        while (it.hasNext()) {
            event.tag((String) ((Map.Entry) it.next()).getKey());
        }
        for (Map.Entry entry : measurement.fields().entrySet()) {
            event.attribute((String) entry.getKey(), (String) entry.getValue());
        }
        Proto.Msg msg = (Proto.Msg) event.send().deref(1L, TimeUnit.SECONDS);
        if (msg == null || msg.hasError()) {
            throw new IOException("Message timed out.");
        }
        if (msg.hasError()) {
            throw new IOException(msg.getError());
        }
        return msg;
    }

    private IRiemannClient riemannClient() {
        if (riemann == null) {
            initRiemannClient(this.configuration);
        }
        return riemann;
    }

    private synchronized void initRiemannClient(ReporterConfiguration reporterConfiguration) {
        if (riemann != null) {
            logger.warn("Riemann client already initialized");
            return;
        }
        logger.debug("Initializing riemann client with config: {}", reporterConfiguration.toString());
        if (!reporterConfiguration.options.containsKey(HOST_PROP)) {
            logger.warn("Tried to init Riemann client. Not properly configured. Aborting initialization.");
            return;
        }
        try {
            riemann = RiemannClient.tcp((String) reporterConfiguration.options.get(HOST_PROP), Integer.parseInt(reporterConfiguration.getDefaultOption(PORT_PROP, DEFAULT_PORT)));
            riemann.connect();
        } catch (IOException e) {
            logger.warn("Riemann client cannot be initialized", e);
        }
    }
}
