/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.TimeAverageBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.apache.pulsar.testclient.utils.FixedColumnLengthTableMaker;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerMonitor {
    private static final Logger log = LoggerFactory.getLogger(BrokerMonitor.class);
    private static final String BROKER_ROOT = "/loadbalance/brokers";
    private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
    private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
    private final ZooKeeper zkClient;
    private static final Gson gson = new Gson();
    private static final List<Object> MESSAGE_FIELDS = Arrays.asList("MSG/S IN", "MSG/S OUT", "TOTAL", "KB/S IN", "KB/S OUT", "TOTAL");
    private static final List<Object> SYSTEM_FIELDS = Arrays.asList("CPU %", "MEMORY %", "DIRECT %", "BW IN %", "BW OUT %", "MAX %");
    private static final Object[] SYSTEM_ROW = BrokerMonitor.makeSystemRow("SYSTEM");
    private static final Object[] COUNT_ROW = new Object[]{"COUNT", "TOPIC", "BUNDLE", "PRODUCER", "CONSUMER", "BUNDLE +", "BUNDLE -"};
    private static final Object[] LATEST_ROW = BrokerMonitor.makeMessageRow("LATEST");
    private static final Object[] SHORT_ROW = BrokerMonitor.makeMessageRow("SHORT");
    private static final Object[] LONG_ROW = BrokerMonitor.makeMessageRow("LONG");
    private static final Object[] RAW_SYSTEM_ROW = BrokerMonitor.makeSystemRow("RAW SYSTEM");
    private static final Object[] ALLOC_SYSTEM_ROW = BrokerMonitor.makeSystemRow("ALLOC SYSTEM");
    private static final Object[] RAW_MESSAGE_ROW = BrokerMonitor.makeMessageRow("RAW MSG");
    private static final Object[] ALLOC_MESSAGE_ROW = BrokerMonitor.makeMessageRow("ALLOC MSG");
    private static final Object[] GLOBAL_HEADER = new Object[]{"BROKER", "BUNDLE", "MSG/S", "LONG/S", "KB/S", "MAX %"};
    private final Map<String, Object> loadData = new ConcurrentHashMap<String, Object>();
    private static final FixedColumnLengthTableMaker localTableMaker = new FixedColumnLengthTableMaker();
    private static final FixedColumnLengthTableMaker globalTableMaker;

    private static Object[] makeMessageRow(String firstElement) {
        ArrayList<Object> result = new ArrayList<Object>();
        result.add(firstElement);
        result.addAll(MESSAGE_FIELDS);
        return result.toArray();
    }

    private static Object[] makeSystemRow(String firstElement) {
        ArrayList<Object> result = new ArrayList<Object>();
        result.add(firstElement);
        result.addAll(SYSTEM_FIELDS);
        return result.toArray();
    }

    private static void initRow(Object[] row, Object ... elements) {
        System.arraycopy(elements, 0, row, 1, elements.length);
    }

    private static void initMessageRow(Object[] row, double messageRateIn, double messageRateOut, double messageThroughputIn, double messageThroughputOut) {
        BrokerMonitor.initRow(row, messageRateIn, messageRateOut, messageRateIn + messageRateOut, messageThroughputIn / 1024.0, messageThroughputOut / 1024.0, (messageThroughputIn + messageThroughputOut) / 1024.0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void printGlobalData() {
        Map<String, Object> map = this.loadData;
        synchronized (map) {
            Object[][] rows = new Object[this.loadData.size() + 2][];
            rows[0] = GLOBAL_HEADER;
            int totalBundles = 0;
            double totalThroughput = 0.0;
            double totalMessageRate = 0.0;
            double totalLongTermMessageRate = 0.0;
            double maxMaxUsage = 0.0;
            int i = 1;
            for (Map.Entry<String, Object> entry : this.loadData.entrySet()) {
                double maxUsage;
                double messageThroughput;
                double longTermMessageRate;
                double messageRate;
                int numBundles;
                String broker = entry.getKey();
                Object data = entry.getValue();
                rows[i] = new Object[GLOBAL_HEADER.length];
                rows[i][0] = broker;
                if (data instanceof LoadReport) {
                    LoadReport loadReport = (LoadReport)data;
                    numBundles = loadReport.getNumBundles();
                    messageRate = loadReport.getMsgRateIn() + loadReport.getMsgRateOut();
                    longTermMessageRate = loadReport.getAllocatedMsgRateIn() + loadReport.getAllocatedMsgRateOut();
                    messageThroughput = (loadReport.getAllocatedBandwidthIn() + loadReport.getAllocatedBandwidthOut()) / 1024.0;
                    SystemResourceUsage systemResourceUsage = loadReport.getSystemResourceUsage();
                    maxUsage = Math.max(Math.max(Math.max(systemResourceUsage.getCpu().percentUsage(), systemResourceUsage.getMemory().percentUsage()), Math.max(systemResourceUsage.getDirectMemory().percentUsage(), systemResourceUsage.getBandwidthIn().percentUsage())), systemResourceUsage.getBandwidthOut().percentUsage());
                } else if (data instanceof LocalBrokerData) {
                    LocalBrokerData localData = (LocalBrokerData)data;
                    numBundles = localData.getNumBundles();
                    messageRate = localData.getMsgRateIn() + localData.getMsgRateOut();
                    String timeAveragePath = "/loadbalance/broker-time-average/" + broker;
                    try {
                        TimeAverageBrokerData timeAverageData = (TimeAverageBrokerData)gson.fromJson(new String(this.zkClient.getData(timeAveragePath, false, null)), TimeAverageBrokerData.class);
                        longTermMessageRate = timeAverageData.getLongTermMsgRateIn() + timeAverageData.getLongTermMsgRateOut();
                    }
                    catch (Exception x) {
                        throw new RuntimeException(x);
                    }
                    messageThroughput = (localData.getMsgThroughputIn() + localData.getMsgThroughputOut()) / 1024.0;
                    maxUsage = localData.getMaxResourceUsage();
                } else {
                    throw new AssertionError((Object)"Unreachable code");
                }
                rows[i][1] = numBundles;
                rows[i][2] = messageRate;
                rows[i][3] = messageThroughput;
                rows[i][4] = longTermMessageRate;
                rows[i][5] = maxUsage;
                totalBundles += numBundles;
                totalMessageRate += messageRate;
                totalLongTermMessageRate += longTermMessageRate;
                totalThroughput += messageThroughput;
                maxMaxUsage = Math.max(maxUsage, maxMaxUsage);
                ++i;
            }
            int finalRow = this.loadData.size() + 1;
            rows[finalRow] = new Object[GLOBAL_HEADER.length];
            rows[finalRow][0] = "TOTAL";
            rows[finalRow][1] = totalBundles;
            rows[finalRow][2] = totalMessageRate;
            rows[finalRow][3] = totalLongTermMessageRate;
            rows[finalRow][4] = totalThroughput;
            rows[finalRow][5] = maxMaxUsage;
            String table = globalTableMaker.make(rows);
            log.info("Overall Broker Data:\n{}", (Object)table);
        }
    }

    public BrokerMonitor(ZooKeeper zkClient) {
        this.zkClient = zkClient;
    }

    public void start() {
        try {
            BrokerWatcher brokerWatcher = new BrokerWatcher(this.zkClient);
            brokerWatcher.updateBrokers(BROKER_ROOT);
            while (true) {
                Thread.sleep(60000L);
                this.printGlobalData();
            }
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    public static void main(String[] args) throws Exception {
        Arguments arguments = new Arguments();
        JCommander jc = new JCommander((Object)arguments);
        jc.setProgramName("pulsar-perf monitor-brokers");
        try {
            jc.parse(args);
        }
        catch (ParameterException e) {
            System.out.println(e.getMessage());
            jc.usage();
            PerfClientUtils.exit(-1);
        }
        ZooKeeper zkClient = new ZooKeeper(arguments.connectString, 30000, null);
        BrokerMonitor monitor = new BrokerMonitor(zkClient);
        monitor.start();
    }

    static {
        BrokerMonitor.localTableMaker.elementLength = 14;
        BrokerMonitor.localTableMaker.decimalFormatter = "%.2f";
        globalTableMaker = new FixedColumnLengthTableMaker();
        BrokerMonitor.globalTableMaker.decimalFormatter = "%.2f";
        BrokerMonitor.globalTableMaker.topBorder = (char)42;
        BrokerMonitor.globalTableMaker.bottomBorder = (char)42;
        BrokerMonitor.globalTableMaker.lengthFunction = column -> column == 0 ? 60 : 12;
    }

    @Parameters(commandDescription="Monitors brokers and prints to the console information about their system resource usages, \ntheir topic and bundle counts, their message rates, and other metrics.")
    private static class Arguments {
        @Parameter(names={"-h", "--help"}, description="Help message", help=true)
        boolean help;
        @Parameter(names={"--connect-string"}, description="Zookeeper connect string", required=true)
        public String connectString = null;

        private Arguments() {
        }
    }

    private class BrokerDataWatcher
    implements Watcher {
        private final ZooKeeper zkClient;

        private BrokerDataWatcher(ZooKeeper zkClient) {
            this.zkClient = zkClient;
        }

        private String brokerNameFromPath(String path) {
            return path.substring(path.lastIndexOf(47) + 1);
        }

        public synchronized void process(WatchedEvent event) {
            try {
                if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                    this.printData(event.getPath());
                }
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }

        private double percentUsage(double usage, double limit) {
            return limit > 0.0 && usage >= 0.0 ? 100.0 * Math.min(1.0, usage / limit) : 0.0;
        }

        private synchronized void printData(String path) {
            String jsonString;
            String broker = this.brokerNameFromPath(path);
            try {
                jsonString = new String(this.zkClient.getData(path, (Watcher)this, null));
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            if (jsonString.contains("allocated")) {
                this.printLoadReport(broker, (LoadReport)gson.fromJson(jsonString, LoadReport.class));
            } else {
                LocalBrokerData localBrokerData = (LocalBrokerData)gson.fromJson(jsonString, LocalBrokerData.class);
                String timeAveragePath = "/loadbalance/broker-time-average/" + broker;
                try {
                    TimeAverageBrokerData timeAverageData = (TimeAverageBrokerData)gson.fromJson(new String(this.zkClient.getData(timeAveragePath, false, null)), TimeAverageBrokerData.class);
                    this.printBrokerData(broker, localBrokerData, timeAverageData);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        private synchronized void printLoadReport(String broker, LoadReport loadReport) {
            BrokerMonitor.this.loadData.put(broker, loadReport);
            Object[][] rows = new Object[10][];
            rows[0] = COUNT_ROW;
            rows[2] = RAW_SYSTEM_ROW;
            rows[4] = ALLOC_SYSTEM_ROW;
            rows[6] = RAW_MESSAGE_ROW;
            rows[8] = ALLOC_MESSAGE_ROW;
            rows[1] = new Object[COUNT_ROW.length];
            BrokerMonitor.initRow(rows[1], new Object[]{loadReport.getNumTopics(), loadReport.getNumBundles(), loadReport.getNumProducers(), loadReport.getNumConsumers(), loadReport.getBundleGains().size(), loadReport.getBundleLosses().size()});
            SystemResourceUsage systemResourceUsage = loadReport.getSystemResourceUsage();
            ResourceUsage cpu = systemResourceUsage.getCpu();
            ResourceUsage memory = systemResourceUsage.getMemory();
            ResourceUsage directMemory = systemResourceUsage.getDirectMemory();
            ResourceUsage bandwidthIn = systemResourceUsage.getBandwidthIn();
            ResourceUsage bandwidthOut = systemResourceUsage.getBandwidthOut();
            double maxUsage = Math.max(Math.max(Math.max(cpu.percentUsage(), memory.percentUsage()), Math.max(directMemory.percentUsage(), bandwidthIn.percentUsage())), bandwidthOut.percentUsage());
            rows[3] = new Object[RAW_SYSTEM_ROW.length];
            BrokerMonitor.initRow(rows[3], new Object[]{Float.valueOf(cpu.percentUsage()), Float.valueOf(memory.percentUsage()), Float.valueOf(directMemory.percentUsage()), Float.valueOf(bandwidthIn.percentUsage()), Float.valueOf(bandwidthOut.percentUsage()), maxUsage});
            rows[5] = new Object[ALLOC_SYSTEM_ROW.length];
            double allocatedCpuUsage = this.percentUsage(loadReport.getAllocatedCPU(), cpu.limit);
            double allocatedMemoryUsage = this.percentUsage(loadReport.getAllocatedMemory(), memory.limit);
            double allocatedBandwidthInUsage = this.percentUsage(loadReport.getAllocatedBandwidthIn(), bandwidthIn.limit);
            double allocatedBandwidthOutUsage = this.percentUsage(loadReport.getAllocatedBandwidthOut(), bandwidthOut.limit);
            double maxAllocatedUsage = Math.max(Math.max(Math.max(allocatedCpuUsage, allocatedMemoryUsage), allocatedBandwidthInUsage), allocatedBandwidthOutUsage);
            BrokerMonitor.initRow(rows[5], new Object[]{allocatedCpuUsage, allocatedMemoryUsage, null, allocatedBandwidthInUsage, allocatedBandwidthOutUsage, maxAllocatedUsage});
            rows[7] = new Object[RAW_MESSAGE_ROW.length];
            BrokerMonitor.initMessageRow(rows[7], loadReport.getMsgRateIn(), loadReport.getMsgRateOut(), bandwidthIn.usage, bandwidthOut.usage);
            rows[9] = new Object[ALLOC_MESSAGE_ROW.length];
            BrokerMonitor.initMessageRow(rows[9], loadReport.getAllocatedMsgRateIn(), loadReport.getAllocatedMsgRateOut(), loadReport.getAllocatedBandwidthIn(), loadReport.getAllocatedBandwidthOut());
            String table = localTableMaker.make(rows);
            log.info("\nLoad Report for {}:\n{}\n", (Object)broker, (Object)table);
        }

        private synchronized void printBrokerData(String broker, LocalBrokerData localBrokerData, TimeAverageBrokerData timeAverageData) {
            BrokerMonitor.this.loadData.put(broker, localBrokerData);
            Object[][] rows = new Object[10][];
            rows[0] = SYSTEM_ROW;
            rows[2] = COUNT_ROW;
            rows[4] = LATEST_ROW;
            rows[6] = SHORT_ROW;
            rows[8] = LONG_ROW;
            rows[1] = new Object[SYSTEM_ROW.length];
            BrokerMonitor.initRow(rows[1], new Object[]{Float.valueOf(localBrokerData.getCpu().percentUsage()), Float.valueOf(localBrokerData.getMemory().percentUsage()), Float.valueOf(localBrokerData.getDirectMemory().percentUsage()), Float.valueOf(localBrokerData.getBandwidthIn().percentUsage()), Float.valueOf(localBrokerData.getBandwidthOut().percentUsage()), localBrokerData.getMaxResourceUsage() * 100.0});
            rows[3] = new Object[COUNT_ROW.length];
            BrokerMonitor.initRow(rows[3], new Object[]{localBrokerData.getNumTopics(), localBrokerData.getNumBundles(), localBrokerData.getNumProducers(), localBrokerData.getNumConsumers(), localBrokerData.getLastBundleGains().size(), localBrokerData.getLastBundleLosses().size()});
            rows[5] = new Object[LATEST_ROW.length];
            BrokerMonitor.initMessageRow(rows[5], localBrokerData.getMsgRateIn(), localBrokerData.getMsgRateOut(), localBrokerData.getMsgThroughputIn(), localBrokerData.getMsgThroughputOut());
            rows[7] = new Object[SHORT_ROW.length];
            BrokerMonitor.initMessageRow(rows[7], timeAverageData.getShortTermMsgRateIn(), timeAverageData.getShortTermMsgRateOut(), timeAverageData.getShortTermMsgThroughputIn(), timeAverageData.getShortTermMsgThroughputOut());
            rows[9] = new Object[LONG_ROW.length];
            BrokerMonitor.initMessageRow(rows[9], timeAverageData.getLongTermMsgRateIn(), timeAverageData.getLongTermMsgRateOut(), timeAverageData.getLongTermMsgThroughputIn(), timeAverageData.getLongTermMsgThroughputOut());
            String table = localTableMaker.make(rows);
            log.info("\nBroker Data for {}:\n{}\n", (Object)broker, (Object)table);
        }
    }

    private class BrokerWatcher
    implements Watcher {
        private final ZooKeeper zkClient;
        private Set<String> brokers;

        private BrokerWatcher(ZooKeeper zkClient) {
            this.zkClient = zkClient;
            this.brokers = Collections.emptySet();
        }

        public synchronized void process(WatchedEvent event) {
            try {
                if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    this.updateBrokers(event.getPath());
                }
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private synchronized void updateBrokers(String path) {
            HashSet<String> newBrokers = new HashSet<String>();
            try {
                newBrokers.addAll(this.zkClient.getChildren(path, (Watcher)this));
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            for (String oldBroker : this.brokers) {
                if (newBrokers.contains(oldBroker)) continue;
                log.info("Lost broker: " + oldBroker);
                Map map = BrokerMonitor.this.loadData;
                synchronized (map) {
                    BrokerMonitor.this.loadData.remove(oldBroker);
                }
            }
            for (String newBroker : newBrokers) {
                if (this.brokers.contains(newBroker)) continue;
                log.info("Gained broker: " + newBroker);
                BrokerDataWatcher brokerDataWatcher = new BrokerDataWatcher(this.zkClient);
                brokerDataWatcher.printData(path + "/" + newBroker);
            }
            this.brokers = newBrokers;
        }
    }
}

