package org.phoebus.applications.alarm.aggregate;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.streams.KafkaStreams;
import org.phoebus.applications.alarm.client.KafkaHelper;
import org.phoebus.applications.alarm.server.CreateTopics;

/* loaded from: input_file:org/phoebus/applications/alarm/aggregate/AggregateTopics.class */
public class AggregateTopics {
    private final String longTerm;
    private final List<String> topics;
    private final KafkaStreams aggregateStream;
    private Logger logger = Logger.getLogger(getClass().getPackageName());
    private String kafka_servers = "localhost:9092";
    private String config = "Accelerator";
    private boolean createTopic = false;

    private AggregateTopics(String[] strArr) {
        String readLine;
        parseArgs(strArr);
        this.topics = createTopicList();
        this.longTerm = this.config + "LongTerm";
        if (this.createTopic) {
            this.logger.info("Discovering and creating topics in " + this.topics.toString());
            CreateTopics.discoverAndCreateTopics(this.kafka_servers, false, List.of(this.longTerm));
        }
        this.logger.info("server:\"" + this.kafka_servers + "\", config: \"" + this.config + "\"");
        this.logger.info("topics: " + this.topics.toString());
        this.aggregateStream = createStream();
        this.logger.info("Starting stream aggregation.");
        this.aggregateStream.start();
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            try {
                System.out.println("Type \"exit\" to stop.");
                do {
                    System.out.print("> ");
                    readLine = bufferedReader.readLine();
                    System.out.print("\n");
                } while (!readLine.equalsIgnoreCase("exit"));
                bufferedReader.close();
            } finally {
            }
        } catch (IOException e) {
            this.logger.log(Level.WARNING, "Reading input from stdin failed.", (Throwable) e);
        }
        System.exit(0);
    }

    private void help() {
        System.out.println("AggregateTopics usage. \n\nThis program serves to aggregate Config, State, Command, and Talk compacted topics into a non compacted long term topic.\n");
        System.out.println("\t-help : Prints this message.");
        System.out.println("\t-server server_name: Allows specification of server address.\n\t\tDefault is \"localhost:9092\".");
        System.out.println("\t-confg config_name: Allows specification of config name.\n\t\tDefault is \"Accelerator\".");
        System.out.println("\t-create : Discovers if the config + \"LongTerm\" topic already exists. If it does not, it creates it.");
        System.exit(0);
    }

    private void parseArgs(String[] strArr) {
        Iterator it = Arrays.asList(strArr).iterator();
        while (it.hasNext()) {
            try {
                String str = (String) it.next();
                if (str.startsWith("-h")) {
                    help();
                } else {
                    if (str.equals("-server")) {
                        if (it.hasNext()) {
                            String str2 = (String) it.next();
                            if (!str2.startsWith("-")) {
                                this.kafka_servers = str2;
                            }
                        }
                        throw new Exception("'-server' must be followed by a server name.");
                    }
                    if (!str.equals("-create")) {
                        if (!str.equals("-config")) {
                            throw new Exception("Unknown argument '" + str + "'.");
                        }
                        if (it.hasNext()) {
                            String str3 = (String) it.next();
                            if (!str3.startsWith("-")) {
                                this.config = str3;
                            }
                        }
                        throw new Exception("'-config' must be followed by a config name.");
                    }
                    this.createTopic = true;
                }
            } catch (Exception e) {
                this.logger.log(Level.WARNING, "Argument Error", (Throwable) e);
                help();
                return;
            }
        }
    }

    private List<String> createTopicList() {
        return List.of(this.config, this.config + "Command", this.config + "Talk");
    }

    private KafkaStreams createStream() {
        KafkaStreams aggregateTopics = KafkaHelper.aggregateTopics(this.kafka_servers, this.topics, this.longTerm);
        aggregateTopics.setUncaughtExceptionHandler((thread, th) -> {
            this.logger.log(Level.WARNING, "Kafka Streams Error.", th);
        });
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(aggregateTopics);
        runtime.addShutdownHook(new Thread(aggregateTopics::close));
        return aggregateTopics;
    }

    public static void main(String[] strArr) {
        new AggregateTopics(strArr);
    }
}
