/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import kafka.utils.Exit$;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Utils;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0$mcB$sp;
import scala.runtime.java8.JFunction1$mcVI$sp;
import scala.util.Random;

public final class EndToEndLatency$ {
    public static EndToEndLatency$ MODULE$;
    private final long timeout;

    static {
        new EndToEndLatency$();
    }

    private long timeout() {
        return this.timeout;
    }

    public void main(String[] args) {
        None$ propsFile;
        if (args.length != 5 && args.length != 6) {
            System.err.println("USAGE: java " + this.getClass().getName() + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
            throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
        }
        String brokerList = args[0];
        String topic = args[1];
        int numMessages = new StringOps(Predef$.MODULE$.augmentString(args[2])).toInt();
        String producerAcks = args[3];
        int messageLen = new StringOps(Predef$.MODULE$.augmentString(args[4])).toInt();
        Option option = propsFile = args.length > 5 ? new Some<String>(args[5]).filter((Function1<String, Object> & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean(EndToEndLatency$.$anonfun$main$1(x$1))) : None$.MODULE$;
        if (!((List)List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1", "all"}))).contains(producerAcks)) {
            throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
        }
        Properties consumerProps = EndToEndLatency$.loadProps$1(propsFile);
        consumerProps.put("bootstrap.servers", brokerList);
        consumerProps.put("group.id", "test-group-" + System.currentTimeMillis());
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("auto.offset.reset", "latest");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProps.put("fetch.max.wait.ms", "0");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps);
        consumer.subscribe(Collections.singletonList(topic));
        Properties producerProps = EndToEndLatency$.loadProps$1(propsFile);
        producerProps.put("bootstrap.servers", brokerList);
        producerProps.put("linger.ms", "0");
        producerProps.put("max.block.ms", ((Object)BoxesRunTime.boxToLong(Long.MAX_VALUE)).toString());
        producerProps.put("acks", producerAcks.toString());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer = new KafkaProducer(producerProps);
        consumer.seekToEnd(Collections.emptyList());
        consumer.poll(0L);
        DoubleRef totalTime = DoubleRef.create(0.0);
        long[] latencies = new long[numMessages];
        Random random = new Random(0);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numMessages).foreach$mVc$sp((JFunction1$mcVI$sp & scala.Serializable)i -> {
            byte[] message = MODULE$.randomBytesOfLen(random, messageLen);
            long begin = System.nanoTime();
            producer.send(new ProducerRecord(topic, message)).get();
            Iterator recordIter = consumer.poll(MODULE$.timeout()).iterator();
            long elapsed = System.nanoTime() - begin;
            if (!recordIter.hasNext()) {
                EndToEndLatency$.finalise$1(consumer, producer);
                throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"poll() timed out before finding a result (timeout:[", "])"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(MODULE$.timeout())})));
            }
            String sent = new String(message, StandardCharsets.UTF_8);
            String read = new String((byte[])recordIter.next().value(), StandardCharsets.UTF_8);
            if (!read.equals(sent)) {
                EndToEndLatency$.finalise$1(consumer, producer);
                throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The message read [", "] did not match the message sent [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{read, sent})));
            }
            if (recordIter.hasNext()) {
                int count = 1 + ((TraversableOnce)JavaConverters$.MODULE$.asScalaIteratorConverter(recordIter).asScala()).size();
                throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Only one result was expected during this test. We found [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(count)})));
            }
            if (i % 1000 == 0) {
                Predef$.MODULE$.println(i + "\t" + (double)elapsed / 1000.0 / 1000.0);
            }
            totalTime$1.elem += (double)elapsed;
            latencies$1[i] = elapsed / 1000L / 1000L;
        });
        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Avg latency: %.4f ms\n")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToDouble(totalTime.elem / (double)numMessages / 1000.0 / 1000.0)})));
        Arrays.sort(latencies);
        long p50 = latencies[(int)((double)latencies.length * 0.5)];
        long p99 = latencies[(int)((double)latencies.length * 0.99)];
        long p999 = latencies[(int)((double)latencies.length * 0.999)];
        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Percentiles: 50th = %d, 99th = %d, 99.9th = %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(p50), BoxesRunTime.boxToLong(p99), BoxesRunTime.boxToLong(p999)})));
        EndToEndLatency$.finalise$1(consumer, producer);
    }

    public byte[] randomBytesOfLen(Random random, int len) {
        return (byte[])Array$.MODULE$.fill(len, (JFunction0$mcB$sp & scala.Serializable)() -> (byte)(random.nextInt(26) + 65), ClassTag$.MODULE$.Byte());
    }

    public static final /* synthetic */ boolean $anonfun$main$1(String x$1) {
        return new StringOps(Predef$.MODULE$.augmentString(x$1)).nonEmpty();
    }

    private static final Properties loadProps$1(Option propsFile$1) {
        return (Properties)propsFile$1.map((Function1<String, Properties> & Serializable & scala.Serializable)x$1 -> Utils.loadProps(x$1)).getOrElse((Function0<Properties> & Serializable & scala.Serializable)() -> new Properties());
    }

    private static final void finalise$1(KafkaConsumer consumer$1, KafkaProducer producer$1) {
        consumer$1.commitSync();
        producer$1.close();
        consumer$1.close();
    }

    private EndToEndLatency$() {
        MODULE$ = this;
        this.timeout = 60000L;
    }
}

