/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.atomic.AtomicInteger;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.buffers.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ProcessBufferProcessor
implements EventHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBufferProcessor.class);
    protected AtomicInteger processBufferWatermark;
    private final Meter incomingMessages;
    private final Timer processTime;
    private final Meter outgoingMessages;
    protected final MetricRegistry metricRegistry;
    private final long ordinal;
    private final long numberOfConsumers;

    public ProcessBufferProcessor(MetricRegistry metricRegistry, AtomicInteger processBufferWatermark, long ordinal, long numberOfConsumers) {
        this.metricRegistry = metricRegistry;
        this.ordinal = ordinal;
        this.numberOfConsumers = numberOfConsumers;
        this.processBufferWatermark = processBufferWatermark;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"incomingMessages"}));
        this.outgoingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"outgoingMessages"}));
        this.processTime = metricRegistry.timer(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"processTime"}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (sequence % this.numberOfConsumers != this.ordinal) {
            return;
        }
        this.processBufferWatermark.decrementAndGet();
        this.incomingMessages.mark();
        Timer.Context tcx = this.processTime.time();
        Message msg = event.getMessage();
        LOG.debug("Starting to process message <{}>.", (Object)msg.getId());
        try {
            LOG.debug("Finished processing message <{}>. Writing to output buffer.", (Object)msg.getId());
            this.handleMessage(msg);
        }
        catch (Exception e) {
            LOG.warn("Unable to process message <{}>: {}", (Object)msg.getId(), (Object)e);
        }
        finally {
            this.outgoingMessages.mark();
            tcx.stop();
        }
    }

    protected abstract void handleMessage(Message var1);
}

