/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.radio.buffers.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.radio.Configuration;
import org.graylog2.radio.transports.RadioTransport;
import org.graylog2.shared.buffers.processors.ProcessBufferProcessor;
import org.graylog2.shared.stats.ThroughputStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RadioProcessBufferProcessor
extends ProcessBufferProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(RadioProcessBufferProcessor.class);
    private static final AtomicInteger errorCount = new AtomicInteger(0);
    private final ThroughputStats throughputStats;
    private final RadioTransport radioTransport;
    private final ServerStatus serverStatus;
    private final int radioTransportMaxErrors;
    private final Meter erroredMessages;

    @Inject
    public RadioProcessBufferProcessor(MetricRegistry metricRegistry, ThroughputStats throughputStats, RadioTransport radioTransport, ServerStatus serverStatus, Configuration configuration) {
        super(metricRegistry);
        this.throughputStats = throughputStats;
        this.radioTransport = radioTransport;
        this.serverStatus = serverStatus;
        this.radioTransportMaxErrors = configuration.getRadioTransportMaxErrors();
        this.erroredMessages = metricRegistry.meter(MetricRegistry.name(RadioProcessBufferProcessor.class, (String[])new String[]{"erroredMessages"}));
    }

    protected void handleMessage(Message msg) {
        try {
            this.radioTransport.send(msg);
            this.throughputStats.getThroughputCounter().add(1L);
            errorCount.set(0);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Message <{}> written to RadioTransport.", (Object)msg.getId());
            }
        }
        catch (Exception e) {
            int errors = errorCount.addAndGet(1);
            if (this.radioTransportMaxErrors > 0 && errors >= this.radioTransportMaxErrors) {
                this.serverStatus.pauseMessageProcessing();
                this.serverStatus.overrideLoadBalancerDead();
                LOG.error("Number of Radio transport errors exceeded threshold ({}), switching to lb:dead.", (Object)this.radioTransportMaxErrors);
            }
            this.erroredMessages.mark();
            LOG.error("[Error #{}] Caught exception while sending message to Radio transport: ", (Object)errors, (Object)e);
        }
    }
}

