package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.node.TextNode;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.6.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker.class */
public class ProduceBenchWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProduceBenchWorker.class);
    private static final int THROTTLE_PERIOD_MS = 100;
    private final String id;
    private final ProduceBenchSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ScheduledExecutorService executor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.6.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$Prepare.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$Prepare.class */
    public class Prepare implements Runnable {
        public Prepare() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                HashMap hashMap = new HashMap();
                HashSet hashSet = new HashSet();
                for (Map.Entry<String, PartitionsSpec> entry : ProduceBenchWorker.this.spec.activeTopics().materialize().entrySet()) {
                    String key = entry.getKey();
                    PartitionsSpec value = entry.getValue();
                    hashMap.put(key, value.newTopic(key));
                    Iterator<Integer> it = value.partitionNumbers().iterator();
                    while (it.hasNext()) {
                        hashSet.add(new TopicPartition(key, it.next().intValue()));
                    }
                }
                if (hashSet.isEmpty()) {
                    throw new RuntimeException("You must specify at least one active topic.");
                }
                for (Map.Entry<String, PartitionsSpec> entry2 : ProduceBenchWorker.this.spec.inactiveTopics().materialize().entrySet()) {
                    String key2 = entry2.getKey();
                    hashMap.put(key2, entry2.getValue().newTopic(key2));
                }
                ProduceBenchWorker.this.status.update(new TextNode("Creating " + hashMap.keySet().size() + " topic(s)"));
                WorkerUtils.createTopics(ProduceBenchWorker.log, ProduceBenchWorker.this.spec.bootstrapServers(), ProduceBenchWorker.this.spec.commonClientConf(), ProduceBenchWorker.this.spec.adminClientConf(), hashMap, false);
                ProduceBenchWorker.this.status.update(new TextNode("Created " + hashMap.keySet().size() + " topic(s)"));
                ProduceBenchWorker.this.executor.submit(new SendRecords(hashSet));
            } catch (Throwable th) {
                WorkerUtils.abort(ProduceBenchWorker.log, "Prepare", th, ProduceBenchWorker.this.doneFuture);
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.6.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$SendRecords.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$SendRecords.class */
    public class SendRecords implements Callable<Void> {
        private final HashSet<TopicPartition> activePartitions;
        private final Future<?> statusUpdaterFuture;
        private final KafkaProducer<byte[], byte[]> producer;
        private final PayloadIterator keys;
        private final PayloadIterator values;
        private final Optional<TransactionGenerator> transactionGenerator;
        private final Throttle throttle;
        private Iterator<TopicPartition> partitionsIterator;
        private Future<RecordMetadata> sendFuture;
        private boolean enableTransactions;
        private final Histogram histogram = new Histogram(5000);
        private AtomicLong transactionsCommitted = new AtomicLong();

        SendRecords(HashSet<TopicPartition> hashSet) {
            this.activePartitions = hashSet;
            this.partitionsIterator = hashSet.iterator();
            this.transactionGenerator = ProduceBenchWorker.this.spec.transactionGenerator();
            this.enableTransactions = this.transactionGenerator.isPresent();
            int perSecToPerPeriod = WorkerUtils.perSecToPerPeriod(ProduceBenchWorker.this.spec.targetMessagesPerSec(), 100L);
            this.statusUpdaterFuture = ProduceBenchWorker.this.executor.scheduleWithFixedDelay(new StatusUpdater(this.histogram, this.transactionsCommitted), 30L, 30L, TimeUnit.SECONDS);
            Properties properties = new Properties();
            properties.put("bootstrap.servers", ProduceBenchWorker.this.spec.bootstrapServers());
            if (this.enableTransactions) {
                properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "produce-bench-transaction-id-" + UUID.randomUUID());
            }
            WorkerUtils.addConfigsToProperties(properties, ProduceBenchWorker.this.spec.commonClientConf(), ProduceBenchWorker.this.spec.producerConf());
            this.producer = new KafkaProducer<>(properties, (Serializer) new ByteArraySerializer(), (Serializer) new ByteArraySerializer());
            this.keys = new PayloadIterator(ProduceBenchWorker.this.spec.keyGenerator());
            this.values = new PayloadIterator(ProduceBenchWorker.this.spec.valueGenerator());
            this.throttle = new SendRecordsThrottle(perSecToPerPeriod, this.producer);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        /* JADX WARN: Finally extract failed */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            long milliseconds = Time.SYSTEM.milliseconds();
            try {
                try {
                    try {
                        try {
                            if (this.enableTransactions) {
                                this.producer.initTransactions();
                            }
                            long j = 0;
                            while (j < ProduceBenchWorker.this.spec.maxMessages()) {
                                if (!this.enableTransactions || !takeTransactionAction()) {
                                    sendMessage();
                                    j++;
                                }
                            }
                            if (this.enableTransactions) {
                                takeTransactionAction();
                            }
                            if (this.sendFuture != null) {
                                this.sendFuture.get();
                            }
                            this.producer.close();
                            this.statusUpdaterFuture.cancel(false);
                            ProduceBenchWorker.log.info("Sent {} total record(s) in {} ms.  status: {}", Long.valueOf(this.histogram.summarize().numSamples()), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new StatusUpdater(this.histogram, this.transactionsCommitted).update());
                        } catch (Throwable th) {
                            if (this.sendFuture != null) {
                                this.sendFuture.get();
                            }
                            this.producer.close();
                            throw th;
                        }
                    } catch (Exception e) {
                        if (this.enableTransactions) {
                            this.producer.abortTransaction();
                        }
                        throw e;
                    }
                } catch (Exception e2) {
                    WorkerUtils.abort(ProduceBenchWorker.log, "SendRecords", e2, ProduceBenchWorker.this.doneFuture);
                    this.statusUpdaterFuture.cancel(false);
                    ProduceBenchWorker.log.info("Sent {} total record(s) in {} ms.  status: {}", Long.valueOf(this.histogram.summarize().numSamples()), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new StatusUpdater(this.histogram, this.transactionsCommitted).update());
                }
                ProduceBenchWorker.this.doneFuture.complete("");
                return null;
            } catch (Throwable th2) {
                this.statusUpdaterFuture.cancel(false);
                ProduceBenchWorker.log.info("Sent {} total record(s) in {} ms.  status: {}", Long.valueOf(this.histogram.summarize().numSamples()), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new StatusUpdater(this.histogram, this.transactionsCommitted).update());
                throw th2;
            }
        }

        private boolean takeTransactionAction() {
            boolean z = true;
            switch (this.transactionGenerator.get().nextAction()) {
                case BEGIN_TRANSACTION:
                    ProduceBenchWorker.log.debug("Beginning transaction.");
                    this.producer.beginTransaction();
                    break;
                case COMMIT_TRANSACTION:
                    ProduceBenchWorker.log.debug("Committing transaction.");
                    this.producer.commitTransaction();
                    this.transactionsCommitted.getAndIncrement();
                    break;
                case ABORT_TRANSACTION:
                    ProduceBenchWorker.log.debug("Aborting transaction.");
                    this.producer.abortTransaction();
                    break;
                case NO_OP:
                    z = false;
                    break;
            }
            return z;
        }

        private void sendMessage() throws InterruptedException {
            if (!this.partitionsIterator.hasNext()) {
                this.partitionsIterator = this.activePartitions.iterator();
            }
            TopicPartition next = this.partitionsIterator.next();
            this.sendFuture = this.producer.send(new ProducerRecord<>(next.topic(), Integer.valueOf(next.partition()), this.keys.next(), this.values.next()), new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
            this.throttle.increment();
        }

        void recordDuration(long j) {
            this.histogram.add(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.6.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$SendRecordsCallback.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$SendRecordsCallback.class */
    public static class SendRecordsCallback implements Callback {
        private final SendRecords sendRecords;
        private final long startMs;

        SendRecordsCallback(SendRecords sendRecords, long j) {
            this.sendRecords = sendRecords;
            this.startMs = j;
        }

        @Override // org.apache.kafka.clients.producer.Callback
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            this.sendRecords.recordDuration(Time.SYSTEM.milliseconds() - this.startMs);
            if (exc != null) {
                ProduceBenchWorker.log.error("SendRecordsCallback: error", (Throwable) exc);
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.6.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$SendRecordsThrottle.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$SendRecordsThrottle.class */
    private static class SendRecordsThrottle extends Throttle {
        private final KafkaProducer<?, ?> producer;

        SendRecordsThrottle(int i, KafkaProducer<?, ?> kafkaProducer) {
            super(i, 100);
            this.producer = kafkaProducer;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.kafka.trogdor.workload.Throttle
        public synchronized void delay(long j) throws InterruptedException {
            long milliseconds = time().milliseconds();
            this.producer.flush();
            super.delay(j - (time().milliseconds() - milliseconds));
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.6.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$StatusData.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$StatusData.class */
    public static class StatusData {
        private final long totalSent;
        private final float averageLatencyMs;
        private final int p50LatencyMs;
        private final int p95LatencyMs;
        private final int p99LatencyMs;
        private final long transactionsCommitted;
        static final float[] PERCENTILES = {0.5f, 0.95f, 0.99f};

        @JsonCreator
        StatusData(@JsonProperty("totalSent") long j, @JsonProperty("averageLatencyMs") float f, @JsonProperty("p50LatencyMs") int i, @JsonProperty("p95LatencyMs") int i2, @JsonProperty("p99LatencyMs") int i3, @JsonProperty("transactionsCommitted") long j2) {
            this.totalSent = j;
            this.averageLatencyMs = f;
            this.p50LatencyMs = i;
            this.p95LatencyMs = i2;
            this.p99LatencyMs = i3;
            this.transactionsCommitted = j2;
        }

        @JsonProperty
        public long totalSent() {
            return this.totalSent;
        }

        @JsonProperty
        public long transactionsCommitted() {
            return this.transactionsCommitted;
        }

        @JsonProperty
        public float averageLatencyMs() {
            return this.averageLatencyMs;
        }

        @JsonProperty
        public int p50LatencyMs() {
            return this.p50LatencyMs;
        }

        @JsonProperty
        public int p95LatencyMs() {
            return this.p95LatencyMs;
        }

        @JsonProperty
        public int p99LatencyMs() {
            return this.p99LatencyMs;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.6.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$StatusUpdater.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ProduceBenchWorker$StatusUpdater.class */
    public class StatusUpdater implements Runnable {
        private final Histogram histogram;
        private final AtomicLong transactionsCommitted;

        StatusUpdater(Histogram histogram, AtomicLong atomicLong) {
            this.histogram = histogram;
            this.transactionsCommitted = atomicLong;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                update();
            } catch (Exception e) {
                WorkerUtils.abort(ProduceBenchWorker.log, "StatusUpdater", e, ProduceBenchWorker.this.doneFuture);
            }
        }

        StatusData update() {
            Histogram.Summary summarize = this.histogram.summarize(StatusData.PERCENTILES);
            StatusData statusData = new StatusData(summarize.numSamples(), summarize.average(), summarize.percentiles().get(0).value(), summarize.percentiles().get(1).value(), summarize.percentiles().get(2).value(), this.transactionsCommitted.get());
            ProduceBenchWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
            return statusData;
        }
    }

    public ProduceBenchWorker(String str, ProduceBenchSpec produceBenchSpec) {
        this.id = str;
        this.spec = produceBenchSpec;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ProducerBenchWorker is already running.");
        }
        log.info("{}: Activating ProduceBenchWorker with {}", this.id, this.spec);
        this.executor = Executors.newScheduledThreadPool(2, ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false));
        this.status = workerStatusTracker;
        this.doneFuture = kafkaFutureImpl;
        this.executor.submit(new Prepare());
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ProduceBenchWorker is not running.");
        }
        log.info("{}: Deactivating ProduceBenchWorker.", this.id);
        this.doneFuture.complete("");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        this.executor = null;
        this.status = null;
        this.doneFuture = null;
    }
}
