/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.util.Properties;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.consumer.Blacklist;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.DefaultDecoder$;
import kafka.tools.KafkaMigrationTool;
import kafka.tools.MirrorMaker;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import kafka.utils.Utils$;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.generic.GenericTraversableTemplate;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ListBuffer;
import scala.runtime.BoxedUnit;
import scala.runtime.IntRef;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
public final class MirrorMaker$
implements Logging {
    public static final MirrorMaker$ MODULE$;
    private Seq<ZookeeperConsumerConnector> connectors;
    private Seq<MirrorMaker.MirrorMakerThread> consumerThreads;
    private ListBuffer<KafkaMigrationTool.ProducerThread> kafka$tools$MirrorMaker$$producerThreads;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    static {
        new MirrorMaker$();
    }

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        MirrorMaker$ mirrorMaker$ = this;
        synchronized (mirrorMaker$) {
            if (!this.bitmap$0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override
    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    @Override
    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging$class.trace(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging$class.trace(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.trace(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging$class.swallowTrace(this, action);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging$class.debug(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging$class.debug(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.debug(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging$class.swallowDebug(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging$class.info(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging$class.info(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.info(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging$class.swallowInfo(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging$class.warn(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging$class.warn(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.warn(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging$class.swallowWarn(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging$class.swallow(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging$class.error(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging$class.error(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.error(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging$class.swallowError(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging$class.fatal(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging$class.fatal(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.fatal(this, msg, e);
    }

    private Seq<ZookeeperConsumerConnector> connectors() {
        return this.connectors;
    }

    private void connectors_$eq(Seq<ZookeeperConsumerConnector> x$1) {
        this.connectors = x$1;
    }

    private Seq<MirrorMaker.MirrorMakerThread> consumerThreads() {
        return this.consumerThreads;
    }

    private void consumerThreads_$eq(Seq<MirrorMaker.MirrorMakerThread> x$1) {
        this.consumerThreads = x$1;
    }

    public ListBuffer<KafkaMigrationTool.ProducerThread> kafka$tools$MirrorMaker$$producerThreads() {
        return this.kafka$tools$MirrorMaker$$producerThreads;
    }

    private void kafka$tools$MirrorMaker$$producerThreads_$eq(ListBuffer<KafkaMigrationTool.ProducerThread> x$1) {
        this.kafka$tools$MirrorMaker$$producerThreads = x$1;
    }

    public void main(String[] args) {
        this.info((Function0<String>)((Object)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Starting mirror maker";
            }
        }));
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "Consumer config to consume from a source cluster. You may specify multiple of these.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec<String> producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec<Integer> numProducersOpt = parser.accepts("num.producers", "Number of producer instances").withRequiredArg().describedAs("Number of producers").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(1), (Integer[])((Object[])new Integer[0]));
        ArgumentAcceptingOptionSpec<Integer> numStreamsOpt = parser.accepts("num.streams", "Number of consumption streams.").withRequiredArg().describedAs("Number of threads").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(1), (Integer[])((Object[])new Integer[0]));
        ArgumentAcceptingOptionSpec<Integer> bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer").withRequiredArg().describedAs("Queue size in terms of number of messages").ofType(Integer.class).defaultsTo(Predef$.MODULE$.int2Integer(10000), (Integer[])((Object[])new Integer[0]));
        ArgumentAcceptingOptionSpec<String> whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec<String> blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to mirror.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        OptionSpecBuilder helpOpt = parser.accepts("help", "Print this message.");
        OptionSet options = parser.parse(args);
        if (options.has(helpOpt)) {
            parser.printHelpOn(System.out);
            System.exit(0);
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{consumerConfigOpt, producerConfigOpt}));
        if (List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new ArgumentAcceptingOptionSpec[]{whitelistOpt, blacklistOpt})).count(new Serializable(options){
            public static final long serialVersionUID = 0L;
            private final OptionSet options$1;

            public final boolean apply(OptionSpec<?> x$1) {
                return this.options$1.has(x$1);
            }
            {
                this.options$1 = options$1;
            }
        }) != 1) {
            Predef$.MODULE$.println("Exactly one of whitelist or blacklist is required.");
            System.exit(1);
        }
        Integer numStreams = options.valueOf(numStreamsOpt);
        int bufferSize = options.valueOf(bufferSizeOpt);
        IndexedSeq<kafka.producer.Producer<byte[], byte[]>> producers = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), options.valueOf(numProducersOpt)).map(new Serializable(producerConfigOpt, options){
            public static final long serialVersionUID = 0L;
            private final ArgumentAcceptingOptionSpec producerConfigOpt$1;
            private final OptionSet options$1;

            public final kafka.producer.Producer<byte[], byte[]> apply(int x$1) {
                String string2;
                block4: {
                    ProducerConfig producerConfig;
                    block3: {
                        Properties props;
                        block2: {
                            props = Utils$.MODULE$.loadProps((String)this.options$1.valueOf(this.producerConfigOpt$1));
                            string2 = props.getProperty("partitioner.class");
                            if (string2 != null) break block2;
                            producerConfig = new ProducerConfig(this, props){
                                private final String partitionerClass;

                                public String partitionerClass() {
                                    return this.partitionerClass;
                                }
                                {
                                    super(props$1);
                                    this.partitionerClass = "kafka.producer.ByteArrayPartitioner";
                                }
                            };
                            break block3;
                        }
                        if (string2 == null) break block4;
                        producerConfig = new ProducerConfig(props);
                    }
                    ProducerConfig config = producerConfig;
                    return new kafka.producer.Producer<byte[], byte[]>(config);
                }
                throw new MatchError((Object)string2);
            }
            {
                this.producerConfigOpt$1 = producerConfigOpt$1;
                this.options$1 = options$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
        this.connectors_$eq(((TraversableLike)JavaConversions$.MODULE$.asScalaBuffer(options.valuesOf(consumerConfigOpt)).toList().map(new Serializable(){
            public static final long serialVersionUID = 0L;

            public final ConsumerConfig apply(String cfg) {
                return new ConsumerConfig(Utils$.MODULE$.loadProps(cfg.toString()));
            }
        }, List$.MODULE$.canBuildFrom())).map(new Serializable(){
            public static final long serialVersionUID = 0L;

            public final ZookeeperConsumerConnector apply(ConsumerConfig x$2) {
                return new ZookeeperConsumerConnector(x$2);
            }
        }, List$.MODULE$.canBuildFrom()));
        TopicFilter filterSpec = options.has(whitelistOpt) ? new Whitelist(options.valueOf(whitelistOpt)) : new Blacklist(options.valueOf(blacklistOpt));
        Seq<Nothing$> streams2 = Nil$.MODULE$;
        try {
            streams2 = (Seq)((GenericTraversableTemplate)this.connectors().map(new Serializable(numStreams, filterSpec){
                public static final long serialVersionUID = 0L;
                private final Integer numStreams$1;
                private final TopicFilter filterSpec$1;

                public final Seq<KafkaStream<byte[], byte[]>> apply(ZookeeperConsumerConnector x$3) {
                    return x$3.createMessageStreamsByFilter(this.filterSpec$1, this.numStreams$1, new DefaultDecoder(DefaultDecoder$.MODULE$.$lessinit$greater$default$1()), new DefaultDecoder(DefaultDecoder$.MODULE$.$lessinit$greater$default$1()));
                }
                {
                    this.numStreams$1 = numStreams$1;
                    this.filterSpec$1 = filterSpec$1;
                }
            }, Seq$.MODULE$.canBuildFrom())).flatten(Predef$.MODULE$.conforms());
        }
        catch (Throwable throwable) {
            this.fatal((Function0<String>)((Object)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Unable to create stream - shutting down mirror maker.";
                }
            }));
            this.connectors().foreach(new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(ZookeeperConsumerConnector x$4) {
                    x$4.shutdown();
                }
            });
        }
        KafkaMigrationTool.ProducerDataChannel producerDataChannel = new KafkaMigrationTool.ProducerDataChannel(bufferSize);
        this.consumerThreads_$eq(((TraversableLike)streams2.zipWithIndex(Seq$.MODULE$.canBuildFrom())).map(new Serializable(producers, producerDataChannel){
            public static final long serialVersionUID = 0L;
            private final IndexedSeq producers$1;
            private final KafkaMigrationTool.ProducerDataChannel producerDataChannel$1;

            public final MirrorMaker.MirrorMakerThread apply(Tuple2<KafkaStream<byte[], byte[]>, Object> streamAndIndex) {
                return new MirrorMaker.MirrorMakerThread(streamAndIndex._1(), this.producerDataChannel$1, this.producers$1, streamAndIndex._2$mcI$sp());
            }
            {
                this.producers$1 = producers$1;
                this.producerDataChannel$1 = producerDataChannel$1;
            }
        }, Seq$.MODULE$.canBuildFrom()));
        this.kafka$tools$MirrorMaker$$producerThreads_$eq(new ListBuffer<KafkaMigrationTool.ProducerThread>());
        Runtime.getRuntime().addShutdownHook(new Thread(){

            public void run() {
                MirrorMaker$.MODULE$.cleanShutdown();
            }
        });
        IntRef i = new IntRef(1);
        producers.foreach(new Serializable(producerDataChannel, i){
            public static final long serialVersionUID = 0L;
            private final KafkaMigrationTool.ProducerDataChannel producerDataChannel$1;
            private final IntRef i$1;

            public final void apply(kafka.producer.Producer<byte[], byte[]> producer) {
                KafkaMigrationTool.ProducerThread producerThread = new KafkaMigrationTool.ProducerThread(this.producerDataChannel$1, new Producer<byte[], byte[]>(producer), this.i$1.elem);
                MirrorMaker$.MODULE$.kafka$tools$MirrorMaker$$producerThreads().$plus$eq((Object)producerThread);
                ++this.i$1.elem;
            }
            {
                this.producerDataChannel$1 = producerDataChannel$1;
                this.i$1 = i$1;
            }
        });
        this.consumerThreads().foreach(new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(MirrorMaker.MirrorMakerThread x$5) {
                x$5.start();
            }
        });
        this.kafka$tools$MirrorMaker$$producerThreads().foreach(new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(KafkaMigrationTool.ProducerThread x$6) {
                x$6.start();
            }
        });
        this.consumerThreads().foreach(new Serializable(){
            public static final long serialVersionUID = 0L;

            public final void apply(MirrorMaker.MirrorMakerThread x$7) {
                x$7.awaitShutdown();
            }
        });
        this.cleanShutdown();
    }

    public void cleanShutdown() {
        if (this.connectors() != null) {
            this.connectors().foreach(new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(ZookeeperConsumerConnector x$8) {
                    x$8.shutdown();
                }
            });
        }
        if (this.consumerThreads() != null) {
            this.consumerThreads().foreach(new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(MirrorMaker.MirrorMakerThread x$9) {
                    x$9.awaitShutdown();
                }
            });
        }
        if (this.kafka$tools$MirrorMaker$$producerThreads() != null) {
            this.kafka$tools$MirrorMaker$$producerThreads().foreach(new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(KafkaMigrationTool.ProducerThread x$10) {
                    x$10.shutdown();
                }
            });
            this.kafka$tools$MirrorMaker$$producerThreads().foreach(new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(KafkaMigrationTool.ProducerThread x$11) {
                    x$11.awaitShutdown();
                }
            });
        }
        this.info((Function0<String>)((Object)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Kafka mirror maker shutdown successfully";
            }
        }));
    }

    private MirrorMaker$() {
        MODULE$ = this;
        Logging$class.$init$(this);
        this.connectors = null;
        this.consumerThreads = null;
        this.kafka$tools$MirrorMaker$$producerThreads = null;
    }
}

