package io.shulie.jmeter.tool.amdb.log.data.pusher;

import io.shulie.jmeter.tool.amdb.GlobalVariables;
import io.shulie.jmeter.tool.amdb.log.data.pusher.callback.LogCallback;
import io.shulie.jmeter.tool.amdb.log.data.pusher.push.ServerOptions;
import io.shulie.jmeter.tool.amdb.log.data.pusher.push.tcp.TcpDataPusher;
import io.shulie.jmeter.tool.amdb.log.data.pusher.server.ServerProviderOptions;
import io.shulie.jmeter.tool.amdb.log.data.pusher.server.impl.DefaultServerAddrProvider;
import io.shulie.jmeter.tool.amdb.zookeeper.ZkClientSpec;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/shulie/jmeter/tool/amdb/log/data/pusher/LogPusher.class */
public class LogPusher implements Runnable {
    public static final Logger logger = LoggerFactory.getLogger(LogPusher.class);
    private String reportId;
    private int threadIndex;
    private String threadName = "uploadThread_";
    private AtomicLong logCount = new AtomicLong(0);
    private Queue<String> queue;

    public LogPusher(Queue<String> queue, int i, String str) {
        this.queue = queue;
        this.threadIndex = i;
        this.reportId = str;
    }

    public void start() {
        long id = Thread.currentThread().getId();
        this.threadName += this.reportId + "_" + this.threadIndex;
        Thread.currentThread().setName(this.threadName);
        logger.info("启动第{}个日志上传线程,线程ID:{},启动时间:{}", new Object[]{Integer.valueOf(this.threadIndex), Long.valueOf(id), Long.valueOf(System.currentTimeMillis())});
        String property = System.getProperty("zkServers");
        ZkClientSpec zkClientSpec = new ZkClientSpec();
        zkClientSpec.setZkServers(property);
        zkClientSpec.setThreadName("cf_" + this.threadName);
        ServerProviderOptions serverProviderOptions = new ServerProviderOptions();
        serverProviderOptions.setServerZkPath(GlobalVariables.AMDB_SERVER_ZK_PATH);
        serverProviderOptions.setSpec(zkClientSpec);
        DefaultServerAddrProvider defaultServerAddrProvider = new DefaultServerAddrProvider(serverProviderOptions);
        TcpDataPusher tcpDataPusher = new TcpDataPusher();
        tcpDataPusher.setServerAddrProvider(defaultServerAddrProvider);
        if (!tcpDataPusher.init(new ServerOptions() { // from class: io.shulie.jmeter.tool.amdb.log.data.pusher.LogPusher.1
            {
                setTimeout(3000);
            }
        })) {
            logger.error("初始化DataPusher异常");
            return;
        }
        tcpDataPusher.start();
        LogCallback logCallback = tcpDataPusher.getLogCallback();
        logger.info("日志上传开始--线程ID:{},线程名称:{},开始时间：{},报告ID:{}", new Object[]{Long.valueOf(id), this.threadName, Long.valueOf(System.currentTimeMillis()), this.reportId});
        while (true) {
            if (GlobalVariables.stopFlag.get() && this.queue.isEmpty()) {
                logger.info("日志上传完成--线程ID:{},线程名称:{},结束时间：{},报告ID:{}，上传数量:{}", new Object[]{Long.valueOf(id), this.threadName, Long.valueOf(System.currentTimeMillis()), this.reportId, Long.valueOf(this.logCount.get())});
                tcpDataPusher.stop();
                return;
            }
            String pollLogData = pollLogData();
            if (StringUtils.isNotBlank(pollLogData)) {
                int i = 3;
                for (boolean call = logCallback.call(pollLogData.getBytes(), (byte) 1, GlobalVariables.VERSION); !call && i > 0; call = logCallback.call(pollLogData.getBytes(), (byte) 1, GlobalVariables.VERSION)) {
                    i--;
                }
            }
        }
    }

    private String pollLogData() {
        long j = 0;
        StringBuilder sb = new StringBuilder();
        while (j < GlobalVariables.UPLOAD_SIZE.longValue() && !this.queue.isEmpty()) {
            String poll = this.queue.poll();
            if (Objects.nonNull(poll)) {
                GlobalVariables.uploadCount.getAndIncrement();
                this.logCount.getAndIncrement();
                sb.append(poll.toString()).append("\r\n");
                j += poll.toString().getBytes().length;
            } else {
                try {
                    TimeUnit.MILLISECONDS.sleep(10L);
                } catch (InterruptedException e) {
                    logger.error("日志上传异常--异常信息：{}", e.toString());
                }
            }
        }
        return sb.toString();
    }

    @Override // java.lang.Runnable
    public void run() {
        start();
    }
}
