package com.metamx.tranquility.kafka;

import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import com.metamx.tranquility.config.TranquilityConfig;
import com.metamx.tranquility.kafka.model.PropertiesBasedKafkaConfig;
import com.metamx.tranquility.kafka.writer.WriterController;
import io.airlift.airline.Command;
import io.airlift.airline.Help;
import io.airlift.airline.HelpOption;
import io.airlift.airline.Option;
import io.airlift.airline.SingleCommand;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
import javax.inject.Inject;

@Command(name = "tranquility-kafka", description = "Kafka consumer which pushes events to Druid through Tranquility")
/* loaded from: input_file:com/metamx/tranquility/kafka/KafkaMain.class */
public class KafkaMain {
    private static final Logger log = new Logger(KafkaMain.class);

    @Inject
    public HelpOption helpOption;

    @Option(name = {"-f", "-configFile"}, description = "Path to configuration property file")
    public String propertiesFile;

    public static void main(String[] strArr) throws Exception {
        try {
            KafkaMain kafkaMain = (KafkaMain) SingleCommand.singleCommand(KafkaMain.class).parse(strArr);
            if (kafkaMain.helpOption.showHelpIfRequested()) {
                return;
            }
            kafkaMain.run();
        } catch (Exception e) {
            log.error(e, "Exception parsing arguments", new Object[0]);
            Help.help(SingleCommand.singleCommand(KafkaMain.class).getCommandMetadata());
        }
    }

    public void run() throws InterruptedException {
        if (this.propertiesFile == null || this.propertiesFile.isEmpty()) {
            this.helpOption.help = true;
            this.helpOption.showHelpIfRequested();
            log.warn("Missing required parameters, aborting.", new Object[0]);
            return;
        }
        TranquilityConfig tranquilityConfig = null;
        try {
            FileInputStream fileInputStream = new FileInputStream(this.propertiesFile);
            Throwable th = null;
            try {
                tranquilityConfig = TranquilityConfig.read(fileInputStream, PropertiesBasedKafkaConfig.class);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
            } finally {
            }
        } catch (IOException e) {
            log.error("Could not read config file: %s, aborting.", new Object[]{this.propertiesFile});
            Throwables.propagate(e);
        }
        PropertiesBasedKafkaConfig propertiesBasedKafkaConfig = (PropertiesBasedKafkaConfig) tranquilityConfig.globalConfig();
        HashMap newHashMap = Maps.newHashMap();
        for (String str : tranquilityConfig.getDataSources()) {
            newHashMap.put(str, tranquilityConfig.getDataSource(str));
        }
        Properties properties = new Properties();
        for (String str2 : ((PropertiesBasedKafkaConfig) tranquilityConfig.globalConfig()).properties().stringPropertyNames()) {
            if (str2.startsWith("kafka.")) {
                properties.setProperty(str2.replaceFirst("kafka\\.", ""), ((PropertiesBasedKafkaConfig) tranquilityConfig.globalConfig()).properties().getProperty(str2));
            }
        }
        properties.setProperty("group.id", propertiesBasedKafkaConfig.getKafkaGroupId());
        properties.setProperty("zookeeper.connect", propertiesBasedKafkaConfig.getKafkaZookeeperConnect());
        if (properties.setProperty("zookeeper.session.timeout.ms", Long.toString(propertiesBasedKafkaConfig.zookeeperTimeout().toStandardDuration().getMillis())) != null) {
            throw new IllegalArgumentException("Set zookeeper.timeout instead of setting kafka.zookeeper.session.timeout.ms");
        }
        final KafkaConsumer kafkaConsumer = new KafkaConsumer(propertiesBasedKafkaConfig, properties, newHashMap, new WriterController(newHashMap));
        try {
            kafkaConsumer.start();
        } catch (Throwable th3) {
            log.error(th3, "Error while starting up. Exiting.", new Object[0]);
            System.exit(1);
        }
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.metamx.tranquility.kafka.KafkaMain.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaMain.log.info("Initiating shutdown...", new Object[0]);
                kafkaConsumer.stop();
            }
        }));
        kafkaConsumer.join();
    }
}
