/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.graylog2.inputs.Cache;
import org.graylog2.inputs.InputCache;
import org.graylog2.plugin.BaseConfiguration;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.buffers.BufferOutOfCapacityException;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.buffers.ProcessingDisabledException;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.shared.buffers.processors.ProcessBufferProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessBuffer
extends Buffer {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBuffer.class);
    public static String SOURCE_INPUT_ATTR_NAME;
    public static String SOURCE_NODE_ATTR_NAME;
    protected ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("processbufferprocessor-%d").build());
    private final BaseConfiguration configuration;
    private final InputCache inputCache;
    private final AtomicInteger processBufferWatermark;
    private final Meter incomingMessages;
    private final Meter rejectedMessages;
    private final Meter cachedMessages;
    private final ServerStatus serverStatus;

    @AssistedInject
    public ProcessBuffer(MetricRegistry metricRegistry, ServerStatus serverStatus, BaseConfiguration configuration, @Assisted InputCache inputCache, @Assisted AtomicInteger processBufferWatermark) {
        this.serverStatus = serverStatus;
        this.configuration = configuration;
        this.inputCache = inputCache;
        this.processBufferWatermark = processBufferWatermark;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBuffer.class, (String[])new String[]{"incomingMessages"}));
        this.rejectedMessages = metricRegistry.meter(MetricRegistry.name(ProcessBuffer.class, (String[])new String[]{"rejectedMessages"}));
        this.cachedMessages = metricRegistry.meter(MetricRegistry.name(ProcessBuffer.class, (String[])new String[]{"cachedMessages"}));
        if (serverStatus.hasCapability(ServerStatus.Capability.RADIO)) {
            SOURCE_INPUT_ATTR_NAME = "gl2_source_radio_input";
            SOURCE_NODE_ATTR_NAME = "gl2_source_radio";
        } else {
            SOURCE_INPUT_ATTR_NAME = "gl2_source_input";
            SOURCE_NODE_ATTR_NAME = "gl2_source_node";
        }
    }

    public Cache getInputCache() {
        return this.inputCache;
    }

    public void initialize(ProcessBufferProcessor[] processors, int ringBufferSize, WaitStrategy waitStrategy, int processBufferProcessors) {
        Disruptor disruptor = new Disruptor(MessageEvent.EVENT_FACTORY, ringBufferSize, (Executor)this.executor, ProducerType.MULTI, waitStrategy);
        LOG.info("Initialized ProcessBuffer with ring size <{}> and wait strategy <{}>.", (Object)ringBufferSize, (Object)waitStrategy.getClass().getSimpleName());
        disruptor.handleEventsWith((EventHandler[])processors);
        this.ringBuffer = disruptor.start();
    }

    public void insertCached(Message message, MessageInput sourceInput) {
        message.setSourceInput(sourceInput);
        String source_input_name = sourceInput != null ? sourceInput.getId() : "<nonexistent input>";
        message.addField(SOURCE_INPUT_ATTR_NAME, (Object)source_input_name);
        message.addField(SOURCE_NODE_ATTR_NAME, (Object)this.serverStatus.getNodeId());
        if (!this.serverStatus.isProcessing()) {
            LOG.debug("Message processing is paused. Writing to cache.");
            this.cachedMessages.mark();
            this.inputCache.add(message);
            return;
        }
        if (!this.hasCapacity()) {
            if (this.configuration.getInputCacheMaxSize() == 0L || (long)this.inputCache.size() < this.configuration.getInputCacheMaxSize()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Out of capacity. Writing to cache.");
                }
                this.cachedMessages.mark();
                this.inputCache.add(message);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Out of capacity. Input cache limit reached. Dropping message.");
                }
                this.rejectedMessages.mark();
            }
            return;
        }
        this.insert(message);
    }

    public void insertFailFast(Message message, MessageInput sourceInput) throws BufferOutOfCapacityException, ProcessingDisabledException {
        message.setSourceInput(sourceInput);
        String source_input_name = sourceInput != null ? sourceInput.getId() : "<nonexistent input>";
        message.addField(SOURCE_INPUT_ATTR_NAME, (Object)source_input_name);
        message.addField(SOURCE_NODE_ATTR_NAME, (Object)this.serverStatus.getNodeId());
        if (!this.serverStatus.isProcessing()) {
            LOG.debug("Rejecting message, because message processing is paused.");
            throw new ProcessingDisabledException();
        }
        if (!this.hasCapacity()) {
            LOG.debug("Rejecting message, because I am full and caching was disabled by input. Raise my size or add more processors.");
            this.rejectedMessages.mark();
            throw new BufferOutOfCapacityException();
        }
        this.insert(message);
    }

    private void insert(Message message) {
        long sequence = this.ringBuffer.next();
        MessageEvent event = (MessageEvent)this.ringBuffer.get(sequence);
        event.setMessage(message);
        this.ringBuffer.publish(sequence);
        this.processBufferWatermark.incrementAndGet();
        this.incomingMessages.mark();
    }

    public static interface Factory {
        public ProcessBuffer create(InputCache var1, AtomicInteger var2);
    }
}

