package com.metamx.tranquility.kafka.writer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.tranquility.config.DataSourceConfig;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.finagle.FinagleRegistry;
import com.metamx.tranquility.kafka.model.MessageCounters;
import com.metamx.tranquility.kafka.model.PropertiesBasedKafkaConfig;
import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.FutureEventListener;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:com/metamx/tranquility/kafka/writer/TranquilityEventWriter.class */
public class TranquilityEventWriter {
    private static final Logger log = new Logger(TranquilityEventWriter.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final DataSourceConfig<PropertiesBasedKafkaConfig> dataSourceConfig;
    private final Tranquilizer<Map<String, Object>> tranquilizer;
    private final AtomicLong receivedCounter = new AtomicLong();
    private final AtomicLong sentCounter = new AtomicLong();
    private final AtomicLong failedCounter = new AtomicLong();
    private final AtomicLong rejectedLogCounter = new AtomicLong();
    private final AtomicReference<Throwable> exception = new AtomicReference<>();

    public TranquilityEventWriter(String str, DataSourceConfig<PropertiesBasedKafkaConfig> dataSourceConfig, CuratorFramework curatorFramework, FinagleRegistry finagleRegistry) {
        this.dataSourceConfig = dataSourceConfig;
        this.tranquilizer = DruidBeams.fromConfig(dataSourceConfig).location(DruidLocation.create(((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).druidIndexingServiceName(), ((PropertiesBasedKafkaConfig) dataSourceConfig.propertiesBasedConfig()).useTopicAsDataSource().booleanValue() ? str : dataSourceConfig.dataSource())).curator(curatorFramework).finagleRegistry(finagleRegistry).buildTranquilizer(dataSourceConfig.tranquilizerBuilder());
        this.tranquilizer.start();
    }

    public void send(byte[] bArr) throws InterruptedException {
        this.receivedCounter.incrementAndGet();
        try {
            this.tranquilizer.send((Map) MAPPER.readValue(bArr, new TypeReference<HashMap<String, Object>>() { // from class: com.metamx.tranquility.kafka.writer.TranquilityEventWriter.1
            })).addEventListener(new FutureEventListener<BoxedUnit>() { // from class: com.metamx.tranquility.kafka.writer.TranquilityEventWriter.2
                public void onSuccess(BoxedUnit boxedUnit) {
                    TranquilityEventWriter.this.sentCounter.incrementAndGet();
                }

                public void onFailure(Throwable th) {
                    TranquilityEventWriter.this.failedCounter.incrementAndGet();
                    if (((PropertiesBasedKafkaConfig) TranquilityEventWriter.this.dataSourceConfig.propertiesBasedConfig()).reportDropsAsExceptions().booleanValue() || !(th instanceof MessageDroppedException)) {
                        TranquilityEventWriter.this.exception.compareAndSet(null, th);
                    }
                }
            });
            maybeThrow();
        } catch (IOException e) {
            this.failedCounter.incrementAndGet();
            long incrementAndGet = this.rejectedLogCounter.incrementAndGet();
            if (incrementAndGet <= 5 || ((incrementAndGet <= 100 && incrementAndGet % 10 == 0) || incrementAndGet % 100 == 0)) {
                log.debug(e, "%d message(s) failed to parse as JSON and were rejected", new Object[]{Long.valueOf(incrementAndGet)});
            }
            if (((PropertiesBasedKafkaConfig) this.dataSourceConfig.propertiesBasedConfig()).reportDropsAsExceptions().booleanValue()) {
                throw Throwables.propagate(e);
            }
        }
    }

    public void flush() throws InterruptedException {
        this.tranquilizer.flush();
        maybeThrow();
    }

    public void stop() {
        try {
            this.tranquilizer.stop();
        } catch (IllegalStateException e) {
            log.info(e, "Exception while stopping Tranquility", new Object[0]);
        }
    }

    public MessageCounters getMessageCounters() {
        return new MessageCounters(this.receivedCounter.get(), this.sentCounter.get(), this.failedCounter.get());
    }

    private void maybeThrow() {
        if (this.exception.get() != null) {
            throw Throwables.propagate(this.exception.get());
        }
    }
}
