package org.logstash.beats;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.wavefront.common.Utils;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLHandshakeException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@ChannelHandler.Sharable
/* loaded from: input_file:BOOT-INF/classes/org/logstash/beats/BeatsHandler.class */
public class BeatsHandler extends SimpleChannelInboundHandler<Batch> {
    private static final Logger logger = LogManager.getLogger((Class<?>) BeatsHandler.class);
    private final IMessageListener messageListener;
    private final Supplier<Counter> duplicateBatchesIgnored = Utils.lazySupplier(() -> {
        return Metrics.newCounter(new MetricName("logsharvesting", "", "filebeat-duplicate-batches"));
    });
    private final Cache<String, BatchIdentity> batchDedupeCache = Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build();

    public BeatsHandler(IMessageListener iMessageListener) {
        this.messageListener = iMessageListener;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace(format(channelHandlerContext, "Channel Active"));
        }
        super.channelActive(channelHandlerContext);
        this.messageListener.onNewConnection(channelHandlerContext);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        if (logger.isTraceEnabled()) {
            logger.trace(format(channelHandlerContext, "Channel Inactive"));
        }
        this.messageListener.onConnectionClose(channelHandlerContext);
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x0072, code lost:
    
        r6.duplicateBatchesIgnored.get().inc();
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x0089, code lost:
    
        if (org.logstash.beats.BeatsHandler.logger.isDebugEnabled() == false) goto L19;
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x008c, code lost:
    
        org.logstash.beats.BeatsHandler.logger.debug(format(r7, "Duplicate filebeat batch received, ignoring"));
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x009b, code lost:
    
        writeAck(r7, r0.getBatch().getProtocol(), r0.getBatch().getHighestSequence());
     */
    @Override // io.netty.channel.SimpleChannelInboundHandler
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void channelRead0(io.netty.channel.ChannelHandlerContext r7, org.logstash.beats.Batch r8) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 491
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.logstash.beats.BeatsHandler.channelRead0(io.netty.channel.ChannelHandlerContext, org.logstash.beats.Batch):void");
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        try {
            if (!(th instanceof SSLHandshakeException)) {
                this.messageListener.onException(channelHandlerContext, th);
            }
            String cls = th.getMessage() == null ? th.getClass().toString() : th.getMessage();
            if (logger.isDebugEnabled()) {
                logger.debug(format(channelHandlerContext, "Handling exception: " + cls), th);
            }
            logger.info(format(channelHandlerContext, "Handling exception: " + cls));
            super.exceptionCaught(channelHandlerContext, th);
            channelHandlerContext.flush();
            channelHandlerContext.close();
        } catch (Throwable th2) {
            super.exceptionCaught(channelHandlerContext, th);
            channelHandlerContext.flush();
            channelHandlerContext.close();
            throw th2;
        }
    }

    private boolean needAck(Message message) {
        return message.getSequence() == message.getBatch().getHighestSequence();
    }

    private void ack(ChannelHandlerContext channelHandlerContext, Message message) {
        if (logger.isTraceEnabled()) {
            logger.trace(format(channelHandlerContext, "Acking message number " + message.getSequence()));
        }
        writeAck(channelHandlerContext, message.getBatch().getProtocol(), message.getSequence());
        writeAck(channelHandlerContext, message.getBatch().getProtocol(), 0);
    }

    private void writeAck(ChannelHandlerContext channelHandlerContext, byte b, int i) {
        channelHandlerContext.writeAndFlush(new Ack(b, i)).addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
            if (channelFuture.isSuccess() && logger.isTraceEnabled() && i > 0) {
                logger.trace(format(channelHandlerContext, "Ack complete for message number " + i));
            }
        });
    }

    private String format(ChannelHandlerContext channelHandlerContext, String str) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().localAddress();
        InetSocketAddress inetSocketAddress2 = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
        return "[local: " + (inetSocketAddress != null ? inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() : "undefined") + ", remote: " + (inetSocketAddress2 != null ? inetSocketAddress2.getAddress().getHostAddress() + ":" + inetSocketAddress2.getPort() : "undefined") + "] " + str;
    }
}
