/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.inputs.gelf.gelf;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayOutputStream;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.graylog2.inputs.gelf.gelf.GELFMessage;
import org.graylog2.inputs.gelf.gelf.GELFMessageChunk;
import org.graylog2.inputs.gelf.gelf.GELFProcessor;
import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.inputs.MessageInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GELFChunkManager
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(GELFChunkManager.class);
    public static final long MILLIS_VALID = 5000L;
    private Map<String, Map<Integer, GELFMessageChunk>> chunks = Maps.newConcurrentMap();
    private GELFProcessor processor;
    private final Meter outdatedMessagesDropped;

    public GELFChunkManager(MetricRegistry metricRegistry, Buffer processBuffer) {
        this.processor = new GELFProcessor(metricRegistry, processBuffer);
        this.outdatedMessagesDropped = metricRegistry.meter(MetricRegistry.name(GELFChunkManager.class, "outdatedMessagesDropped"));
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (!this.chunks.isEmpty() && LOG.isDebugEnabled()) {
                    LOG.debug("Dumping GELF chunk map [{}]:\n{}", (Object)this.chunks.size(), (Object)this.humanReadableChunkMap());
                }
                for (Map.Entry<String, Map<Integer, GELFMessageChunk>> message : this.chunks.entrySet()) {
                    String messageId = message.getKey();
                    if (this.isOutdated(messageId)) {
                        this.outdatedMessagesDropped.mark();
                        LOG.debug("Not all chunks of <{}> arrived in time. Dropping. [{}ms]", (Object)messageId, (Object)5000L);
                        this.dropMessage(messageId);
                        continue;
                    }
                    if (!this.isComplete(messageId)) continue;
                    LOG.debug("Message <{}> seems to be complete. Handling now.", (Object)messageId);
                    this.processor.messageReceived(new GELFMessage(this.chunksToByteArray(messageId)), this.getSourceInput(messageId));
                    LOG.debug("Message <{}> is now being processed. Dropping from chunk map.", (Object)messageId);
                    this.dropMessage(messageId);
                }
            }
            catch (Exception e) {
                LOG.error("Error in GELFChunkManager", e);
            }
            Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
        }
    }

    public boolean isComplete(String messageId) {
        if (!this.chunks.containsKey(messageId)) {
            LOG.debug("Message <{}> not in chunk map. Not checking if complete.", (Object)messageId);
            return false;
        }
        if (!this.chunks.get(messageId).containsKey(0)) {
            LOG.debug("Message <{}> does not even contain first chunk. Not complete!", (Object)messageId);
            return false;
        }
        int claimedSequenceCount = this.chunks.get(messageId).get(0).getSequenceCount();
        return claimedSequenceCount == this.chunks.get(messageId).size();
    }

    public boolean isOutdated(String messageId) {
        if (!this.chunks.containsKey(messageId)) {
            LOG.debug("Message <{}> not in chunk map. Not checking if outdated.", (Object)messageId);
            return false;
        }
        long limit = System.currentTimeMillis() - 5000L;
        for (Map.Entry<Integer, GELFMessageChunk> chunk : this.chunks.get(messageId).entrySet()) {
            if (chunk.getValue().getArrival() >= limit) continue;
            return true;
        }
        return false;
    }

    public void dropMessage(String messageId) {
        if (this.chunks.containsKey(messageId)) {
            this.chunks.remove(messageId);
        } else {
            LOG.debug("Message <{}> not in chunk map. Not dropping.", (Object)messageId);
        }
    }

    public byte[] chunksToByteArray(String messageId) throws Exception {
        if (!this.chunks.containsKey(messageId)) {
            throw new Exception("Message <" + messageId + "> not in chunk map. Cannot re-assemble.");
        }
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        for (Map.Entry<Integer, GELFMessageChunk> chunk : this.chunks.get(messageId).entrySet()) {
            out.write(chunk.getValue().getData(), 0, chunk.getValue().getData().length);
        }
        return out.toByteArray();
    }

    private MessageInput getSourceInput(String messageId) {
        try {
            return this.chunks.get(messageId).get(0).getSourceInput();
        }
        catch (Exception e) {
            LOG.error("Could not get source input ID from chunked GELF message.", e);
            return null;
        }
    }

    public boolean hasMessage(String messageId) {
        return this.chunks.containsKey(messageId);
    }

    public void insert(GELFMessage msg, MessageInput sourceInput) {
        this.insert(new GELFMessageChunk(msg, sourceInput));
    }

    public void insert(GELFMessageChunk chunk) {
        LOG.debug("Handling GELF chunk: {}", (Object)chunk);
        if (this.chunks.containsKey(chunk.getId())) {
            this.chunks.get(chunk.getId()).put(chunk.getSequenceNumber(), chunk);
        } else {
            TreeMap<Integer, GELFMessageChunk> c = Maps.newTreeMap();
            c.put(chunk.getSequenceNumber(), chunk);
            this.chunks.put(chunk.getId(), c);
        }
    }

    public String humanReadableChunkMap() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, Map<Integer, GELFMessageChunk>> entry2 : this.chunks.entrySet()) {
            sb.append("Message <").append(entry2.getKey()).append("> ");
            sb.append("\tChunks:\n");
            for (Map.Entry<Integer, GELFMessageChunk> chunk : entry2.getValue().entrySet()) {
                sb.append("\t\t").append(chunk.getValue()).append("\n");
            }
        }
        return sb.toString();
    }
}

