/*
 * Decompiled with CFR 0.152.
 */
package io.etrace.agent.io;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.etrace.agent.config.AgentConfiguration;
import io.etrace.agent.config.CollectorRegistry;
import io.etrace.agent.io.Client;
import io.etrace.agent.io.MessageSender;
import io.etrace.agent.io.SocketClientFactory;
import io.etrace.agent.message.event.DataEvent;
import io.etrace.agent.stat.TCPStats;
import io.etrace.common.modal.MessageHeader;
import io.etrace.common.util.JSONUtil;
import io.etrace.common.util.NetworkInterfaceHelper;
import io.etrace.common.util.ThreadUtil;
import java.util.concurrent.TimeUnit;

public class TcpMessageSender
implements MessageSender {
    public static final String TRACE = "Trace";
    public static final String Metric = "Metric";
    public static final String METRIC_TCP_MESSAGE_SENDER = "metricTCPMessageSender";
    private volatile boolean active = true;
    private long startTime = System.currentTimeMillis();
    private TCPStats stats;
    private MessageHeader messageHeader;
    private Disruptor<DataEvent> disruptor;
    private RingBuffer<DataEvent> ringBuffer;
    private DataProducer messageProducer;

    public TcpMessageSender(String messageType, TCPStats stats) {
        this.stats = stats;
        this.messageHeader = new MessageHeader();
        this.messageHeader.setHostIp(NetworkInterfaceHelper.INSTANCE.getLocalHostAddress());
        this.messageHeader.setHostName(NetworkInterfaceHelper.INSTANCE.getLocalHostName());
        this.messageHeader.setMessageType(messageType);
        this.messageHeader.setVersion("1.0-SNAPSHOT");
        DataEvent.DataEventFactory factory = new DataEvent.DataEventFactory();
        int bufferSize = 256;
        this.messageProducer = new DataProducer();
        this.disruptor = new Disruptor((EventFactory)factory, bufferSize, r -> {
            Thread t = new Thread(r);
            t.setName(messageType + "-TCPSender");
            t.setDaemon(true);
            return t;
        }, ProducerType.SINGLE, (WaitStrategy)new LiteBlockingWaitStrategy());
        this.disruptor.handleEventsWith(new EventHandler[]{new DataConsumer()});
        this.disruptor.setDefaultExceptionHandler((ExceptionHandler)new ExceptionHandler<DataEvent>(){

            public void handleEventException(Throwable ex, long sequence, DataEvent event) {
            }

            public void handleOnStartException(Throwable ex) {
            }

            public void handleOnShutdownException(Throwable ex) {
            }
        });
        this.ringBuffer = this.disruptor.getRingBuffer();
        this.disruptor.start();
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                TcpMessageSender.this.shutdown();
            }
        });
    }

    @Override
    public void shutdown() {
        if (System.currentTimeMillis() - this.startTime < 2000L) {
            ThreadUtil.sleepForSecond((long)2L);
        }
        this.active = false;
        try {
            this.disruptor.shutdown(2L, TimeUnit.SECONDS);
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Override
    public int getQueueSize() {
        return this.ringBuffer.getBufferSize() - (int)this.ringBuffer.remainingCapacity();
    }

    @Override
    public void send(byte[] chunk, int messageCount) {
        this.stats.incTotalSize(chunk.length);
        if (!this.ringBuffer.tryPublishEvent((EventTranslatorTwoArg)this.messageProducer, (Object)chunk, (Object)messageCount)) {
            this.stats.incTcpLoss(messageCount);
            this.stats.incLossSize(chunk.length);
            ThreadUtil.sleep((long)0L);
        }
    }

    class DataConsumer
    implements EventHandler<DataEvent> {
        private static final int MAX_SLEEP_TIME = 60000;
        private static final int INVALID_TIME = 30000;
        private Client socketClient = SocketClientFactory.getClient();

        DataConsumer() {
        }

        public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception {
            byte[] chunk = event.getBuffer();
            int count = event.getCount();
            if (chunk != null && count > 0) {
                TcpMessageSender.this.stats.incTcpPollCount(count);
                if (this.openThriftConnection(sequence)) {
                    this.internalSend(chunk, count);
                } else {
                    TcpMessageSender.this.stats.incLossSize(chunk.length);
                    TcpMessageSender.this.stats.incLossInNet(count);
                }
            }
            if (endOfBatch) {
                this.socketClient.tryCloseConnWhenLongTime();
            }
            event.clear();
        }

        private boolean openThriftConnection(long sequence) {
            int sleepTime = 10;
            Long start = null;
            long nextSequence = sequence;
            while (!this.socketClient.openConnection()) {
                long sleptTime;
                if (!TcpMessageSender.this.active) {
                    return false;
                }
                if (start == null) {
                    start = System.currentTimeMillis();
                }
                if ((sleptTime = System.currentTimeMillis() - start) >= 30000L) {
                    CollectorRegistry.getInstance().setIsAvailable(false);
                }
                if (sleptTime >= 30000L && nextSequence < sequence + (long)TcpMessageSender.this.getQueueSize() - 1L) {
                    nextSequence = this.clearEvent(nextSequence, sequence);
                }
                if (sleepTime > 60000) {
                    sleepTime = 60000;
                } else if (sleepTime < 60000) {
                    sleepTime *= 2;
                }
                ThreadUtil.sleep((long)sleepTime);
            }
            if (start != null) {
                CollectorRegistry.getInstance().setIsAvailable(true);
            }
            return true;
        }

        private long clearEvent(long nextSequence, long sequence) {
            while (nextSequence < sequence + (long)TcpMessageSender.this.getQueueSize() - 1L) {
                DataEvent dataEvent = (DataEvent)TcpMessageSender.this.ringBuffer.get(++nextSequence);
                if (dataEvent == null) {
                    return nextSequence;
                }
                byte[] chunk = dataEvent.getBuffer();
                int count = dataEvent.getCount();
                if (chunk != null && count > 0) {
                    dataEvent.clear();
                    TcpMessageSender.this.stats.incLossSize(chunk.length);
                    TcpMessageSender.this.stats.incLossInNet(count);
                    continue;
                }
                return nextSequence;
            }
            return nextSequence;
        }

        private void internalSend(byte[] data, int count) {
            int len = data.length;
            try {
                TcpMessageSender.this.messageHeader.setAppId(AgentConfiguration.getAppId());
                TcpMessageSender.this.messageHeader.setAst(System.currentTimeMillis());
                TcpMessageSender.this.messageHeader.setInstance(AgentConfiguration.getInstance());
                boolean success = this.socketClient.send(JSONUtil.toBytes((Object)TcpMessageSender.this.messageHeader), data);
                if (success) {
                    TcpMessageSender.this.stats.incSuccessCount(count);
                } else {
                    TcpMessageSender.this.stats.incLossSize(len);
                    TcpMessageSender.this.stats.incLossInNet(count);
                }
            }
            catch (Exception e) {
                TcpMessageSender.this.stats.incLossSize(len);
                TcpMessageSender.this.stats.incLossInNet(count);
            }
        }
    }

    class DataProducer
    implements EventTranslatorTwoArg<DataEvent, byte[], Integer> {
        DataProducer() {
        }

        public void translateTo(DataEvent event, long sequence, byte[] data, Integer totalCount) {
            event.reset(data, totalCount);
        }
    }
}

