package com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.workload;

import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.JsonNode;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.TopicPartition;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.internals.KafkaFutureImpl;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.Deserializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.ThreadUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Time;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Utils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.common.JsonUtil;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.common.Platform;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.common.WorkerUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.task.TaskWorker;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.task.WorkerStatusTracker;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.workload.Histogram;
import io.netty.handler.codec.http2.Http2CodecUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.class */
public class ConsumeBenchWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
    private static final int THROTTLE_PERIOD_MS = 100;
    private final String id;
    private final ConsumeBenchSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ScheduledExecutorService executor;
    private WorkerStatusTracker workerStatus;
    private StatusUpdater statusUpdater;
    private Future<?> statusUpdaterFuture;
    private KafkaFutureImpl<String> doneFuture;
    private ThreadSafeConsumer consumer;

    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker$CloseStatusUpdater.class */
    public class CloseStatusUpdater implements Runnable {
        private final List<Future<Void>> consumeTasks;

        CloseStatusUpdater(List<Future<Void>> list) {
            this.consumeTasks = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.consumeTasks.stream().allMatch((v0) -> {
                return v0.isDone();
            })) {
                try {
                    Thread.sleep(60000L);
                } catch (InterruptedException e) {
                    ConsumeBenchWorker.log.debug("{} was interrupted. Closing...", getClass().getName());
                }
            }
            ConsumeBenchWorker.this.statusUpdaterFuture.cancel(false);
            ConsumeBenchWorker.this.statusUpdater.update();
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker$ConsumeMessages.class */
    public class ConsumeMessages implements Callable<Void> {
        private final Histogram latencyHistogram;
        private final Histogram messageSizeHistogram;
        private final Future<?> statusUpdaterFuture;
        private final Throttle throttle;
        private final String clientId;
        private final ThreadSafeConsumer consumer;

        private ConsumeMessages(ThreadSafeConsumer threadSafeConsumer) {
            this.latencyHistogram = new Histogram(Http2CodecUtil.DEFAULT_MAX_QUEUED_CONTROL_FRAMES);
            this.messageSizeHistogram = new Histogram(2097152);
            this.clientId = threadSafeConsumer.clientId();
            this.statusUpdaterFuture = ConsumeBenchWorker.this.executor.scheduleAtFixedRate(new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, threadSafeConsumer), 1L, 1L, TimeUnit.MINUTES);
            this.throttle = new Throttle(ConsumeBenchWorker.this.spec.targetMessagesPerSec() <= 0 ? Integer.MAX_VALUE : WorkerUtils.perSecToPerPeriod(ConsumeBenchWorker.this.spec.targetMessagesPerSec(), 100L), 100);
            this.consumer = threadSafeConsumer;
        }

        ConsumeMessages(ConsumeBenchWorker consumeBenchWorker, ThreadSafeConsumer threadSafeConsumer, Set<String> set) {
            this(threadSafeConsumer);
            ConsumeBenchWorker.log.info("Will consume from topics {} via dynamic group assignment.", set);
            this.consumer.subscribe(set);
        }

        ConsumeMessages(ConsumeBenchWorker consumeBenchWorker, ThreadSafeConsumer threadSafeConsumer, List<TopicPartition> list) {
            this(threadSafeConsumer);
            ConsumeBenchWorker.log.info("Will consume from topic partitions {} via manual assignment.", list);
            this.consumer.assign(list);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            long j = 0;
            long j2 = 0;
            long milliseconds = Time.SYSTEM.milliseconds();
            long j3 = milliseconds;
            long maxMessages = ConsumeBenchWorker.this.spec.maxMessages();
            while (j < maxMessages) {
                try {
                    try {
                        ConsumerRecords<byte[], byte[]> poll = this.consumer.poll();
                        if (!poll.isEmpty()) {
                            long milliseconds2 = Time.SYSTEM.milliseconds() - j3;
                            Iterator<ConsumerRecord<byte[], byte[]>> it = poll.iterator();
                            while (it.hasNext()) {
                                ConsumerRecord<byte[], byte[]> next = it.next();
                                j++;
                                long j4 = 0;
                                if (next.key() != null) {
                                    j4 = 0 + next.serializedKeySize();
                                }
                                if (next.value() != null) {
                                    j4 += next.serializedValueSize();
                                }
                                this.latencyHistogram.add(milliseconds2);
                                this.messageSizeHistogram.add(j4);
                                j2 += j4;
                                if (j >= maxMessages) {
                                    break;
                                }
                                this.throttle.increment();
                            }
                            j3 = Time.SYSTEM.milliseconds();
                        }
                    } catch (Exception e) {
                        WorkerUtils.abort(ConsumeBenchWorker.log, "ConsumeRecords", e, ConsumeBenchWorker.this.doneFuture);
                        this.statusUpdaterFuture.cancel(false);
                        ConsumeBenchWorker.log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer).update()});
                    }
                } catch (Throwable th) {
                    this.statusUpdaterFuture.cancel(false);
                    ConsumeBenchWorker.log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer).update()});
                    throw th;
                }
            }
            this.statusUpdaterFuture.cancel(false);
            ConsumeBenchWorker.log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer).update()});
            ConsumeBenchWorker.this.doneFuture.complete("");
            this.consumer.close();
            return null;
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker$ConsumeStatusUpdater.class */
    public class ConsumeStatusUpdater implements Runnable {
        private final Histogram latencyHistogram;
        private final Histogram messageSizeHistogram;
        private final ThreadSafeConsumer consumer;

        ConsumeStatusUpdater(Histogram histogram, Histogram histogram2, ThreadSafeConsumer threadSafeConsumer) {
            this.latencyHistogram = histogram;
            this.messageSizeHistogram = histogram2;
            this.consumer = threadSafeConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                update();
            } catch (Exception e) {
                WorkerUtils.abort(ConsumeBenchWorker.log, "ConsumeStatusUpdater", e, ConsumeBenchWorker.this.doneFuture);
            }
        }

        StatusData update() {
            Histogram.Summary summarize = this.latencyHistogram.summarize(StatusData.PERCENTILES);
            Histogram.Summary summarize2 = this.messageSizeHistogram.summarize(StatusData.PERCENTILES);
            StatusData statusData = new StatusData(this.consumer.assignedPartitions(), summarize.numSamples(), ((float) summarize2.numSamples()) * summarize2.average(), summarize2.average(), summarize.average(), summarize.percentiles().get(0).value(), summarize.percentiles().get(1).value(), summarize.percentiles().get(2).value());
            ConsumeBenchWorker.this.statusUpdater.updateConsumeStatus(this.consumer.clientId(), statusData);
            ConsumeBenchWorker.log.info("Status={}", JsonUtil.toJsonString(statusData));
            return statusData;
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker$Prepare.class */
    public class Prepare implements Runnable {
        public Prepare() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ArrayList arrayList = new ArrayList();
                Iterator<ConsumeMessages> it = consumeTasks().iterator();
                while (it.hasNext()) {
                    arrayList.add(ConsumeBenchWorker.this.executor.submit(it.next()));
                }
                ConsumeBenchWorker.this.executor.submit(new CloseStatusUpdater(arrayList));
            } catch (Throwable th) {
                WorkerUtils.abort(ConsumeBenchWorker.log, "Prepare", th, ConsumeBenchWorker.this.doneFuture);
            }
        }

        private List<ConsumeMessages> consumeTasks() {
            ArrayList arrayList = new ArrayList();
            String consumerGroup = consumerGroup();
            int threadsPerWorker = ConsumeBenchWorker.this.spec.threadsPerWorker();
            Map<String, List<TopicPartition>> materializeTopics = ConsumeBenchWorker.this.spec.materializeTopics();
            boolean allMatch = materializeTopics.values().stream().allMatch((v0) -> {
                return v0.isEmpty();
            });
            if (!allMatch && !toUseRandomConsumeGroup() && threadsPerWorker > 1) {
                throw new ConfigException("You may not specify an explicit partition assignment when using multiple consumers in the same group.Please leave the consumer group unset, specify topics instead of partitions or use a single consumer.");
            }
            ConsumeBenchWorker.this.consumer = consumer(consumerGroup, clientId(0));
            if (allMatch) {
                Set<String> keySet = materializeTopics.keySet();
                arrayList.add(new ConsumeMessages(ConsumeBenchWorker.this, ConsumeBenchWorker.this.consumer, keySet));
                for (int i = 0; i < threadsPerWorker - 1; i++) {
                    arrayList.add(new ConsumeMessages(ConsumeBenchWorker.this, consumer(consumerGroup(), clientId(i + 1)), keySet));
                }
            } else {
                List list = (List) populatePartitionsByTopic(ConsumeBenchWorker.this.consumer.consumer(), materializeTopics).values().stream().flatMap((v0) -> {
                    return v0.stream();
                }).collect(Collectors.toList());
                arrayList.add(new ConsumeMessages(ConsumeBenchWorker.this, ConsumeBenchWorker.this.consumer, (List<TopicPartition>) list));
                for (int i2 = 0; i2 < threadsPerWorker - 1; i2++) {
                    arrayList.add(new ConsumeMessages(ConsumeBenchWorker.this, consumer(consumerGroup(), clientId(i2 + 1)), (List<TopicPartition>) list));
                }
            }
            return arrayList;
        }

        private String clientId(int i) {
            return String.format("consumer.%s-%d", ConsumeBenchWorker.this.id, Integer.valueOf(i));
        }

        private ThreadSafeConsumer consumer(String str, String str2) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", ConsumeBenchWorker.this.spec.bootstrapServers());
            properties.put("client.id", str2);
            properties.put("group.id", str);
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            properties.put("max.poll.interval.ms", 100000);
            WorkerUtils.addConfigsToProperties(properties, ConsumeBenchWorker.this.spec.commonClientConf(), ConsumeBenchWorker.this.spec.consumerConf());
            return new ThreadSafeConsumer(new KafkaConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer()), str2);
        }

        private String consumerGroup() {
            return toUseRandomConsumeGroup() ? "consume-bench-" + UUID.randomUUID().toString() : ConsumeBenchWorker.this.spec.consumerGroup();
        }

        private boolean toUseRandomConsumeGroup() {
            return ConsumeBenchWorker.this.spec.consumerGroup().isEmpty();
        }

        private Map<String, List<TopicPartition>> populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> kafkaConsumer, Map<String, List<TopicPartition>> map) {
            for (Map.Entry<String, List<TopicPartition>> entry : map.entrySet()) {
                String key = entry.getKey();
                List<TopicPartition> value = entry.getValue();
                if (value.isEmpty()) {
                    value.addAll((List) kafkaConsumer.partitionsFor(key).stream().map(partitionInfo -> {
                        return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    }).collect(Collectors.toList()));
                }
                map.put(key, value);
            }
            return map;
        }
    }

    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker$StatusData.class */
    public static class StatusData {
        private final long totalMessagesReceived;
        private final List<String> assignedPartitions;
        private final long totalBytesReceived;
        private final long averageMessageSizeBytes;
        private final float averageLatencyMs;
        private final int p50LatencyMs;
        private final int p95LatencyMs;
        private final int p99LatencyMs;
        static final float[] PERCENTILES = {0.5f, 0.95f, 0.99f};

        @JsonCreator
        StatusData(@JsonProperty("assignedPartitions") List<String> list, @JsonProperty("totalMessagesReceived") long j, @JsonProperty("totalBytesReceived") long j2, @JsonProperty("averageMessageSizeBytes") long j3, @JsonProperty("averageLatencyMs") float f, @JsonProperty("p50LatencyMs") int i, @JsonProperty("p95LatencyMs") int i2, @JsonProperty("p99LatencyMs") int i3) {
            this.assignedPartitions = list;
            this.totalMessagesReceived = j;
            this.totalBytesReceived = j2;
            this.averageMessageSizeBytes = j3;
            this.averageLatencyMs = f;
            this.p50LatencyMs = i;
            this.p95LatencyMs = i2;
            this.p99LatencyMs = i3;
        }

        @JsonProperty
        public List<String> assignedPartitions() {
            return this.assignedPartitions;
        }

        @JsonProperty
        public long totalMessagesReceived() {
            return this.totalMessagesReceived;
        }

        @JsonProperty
        public long totalBytesReceived() {
            return this.totalBytesReceived;
        }

        @JsonProperty
        public long averageMessageSizeBytes() {
            return this.averageMessageSizeBytes;
        }

        @JsonProperty
        public float averageLatencyMs() {
            return this.averageLatencyMs;
        }

        @JsonProperty
        public int p50LatencyMs() {
            return this.p50LatencyMs;
        }

        @JsonProperty
        public int p95LatencyMs() {
            return this.p95LatencyMs;
        }

        @JsonProperty
        public int p99LatencyMs() {
            return this.p99LatencyMs;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker$StatusUpdater.class */
    public class StatusUpdater implements Runnable {
        final Map<String, JsonNode> statuses = new HashMap();

        StatusUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                update();
            } catch (Exception e) {
                WorkerUtils.abort(ConsumeBenchWorker.log, "ConsumeStatusUpdater", e, ConsumeBenchWorker.this.doneFuture);
            }
        }

        synchronized void update() {
            ConsumeBenchWorker.this.workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(this.statuses));
        }

        synchronized void updateConsumeStatus(String str, StatusData statusData) {
            this.statuses.put(str, JsonUtil.JSON_SERDE.valueToTree(statusData));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/trogdor/workload/ConsumeBenchWorker$ThreadSafeConsumer.class */
    public static class ThreadSafeConsumer {
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final String clientId;
        private boolean closed = false;
        private final ReentrantLock consumerLock = new ReentrantLock();

        ThreadSafeConsumer(KafkaConsumer<byte[], byte[]> kafkaConsumer, String str) {
            this.consumer = kafkaConsumer;
            this.clientId = str;
        }

        ConsumerRecords<byte[], byte[]> poll() {
            this.consumerLock.lock();
            try {
                return this.consumer.poll(Duration.ofMillis(50L));
            } finally {
                this.consumerLock.unlock();
            }
        }

        void close() {
            if (this.closed) {
                return;
            }
            this.consumerLock.lock();
            try {
                this.consumer.unsubscribe();
                Utils.closeQuietly(this.consumer, ConsumerProtocol.PROTOCOL_TYPE);
                this.closed = true;
            } finally {
                this.consumerLock.unlock();
            }
        }

        void subscribe(Set<String> set) {
            this.consumerLock.lock();
            try {
                this.consumer.subscribe(set);
            } finally {
                this.consumerLock.unlock();
            }
        }

        void assign(Collection<TopicPartition> collection) {
            this.consumerLock.lock();
            try {
                this.consumer.assign(collection);
            } finally {
                this.consumerLock.unlock();
            }
        }

        List<String> assignedPartitions() {
            this.consumerLock.lock();
            try {
                return (List) this.consumer.assignment().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList());
            } finally {
                this.consumerLock.unlock();
            }
        }

        String clientId() {
            return this.clientId;
        }

        KafkaConsumer<byte[], byte[]> consumer() {
            return this.consumer;
        }
    }

    public ConsumeBenchWorker(String str, ConsumeBenchSpec consumeBenchSpec) {
        this.id = str;
        this.spec = consumeBenchSpec;
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ConsumeBenchWorker is already running.");
        }
        log.info("{}: Activating ConsumeBenchWorker with {}", this.id, this.spec);
        this.statusUpdater = new StatusUpdater();
        this.executor = Executors.newScheduledThreadPool(this.spec.threadsPerWorker() + 2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
        this.statusUpdaterFuture = this.executor.scheduleAtFixedRate(this.statusUpdater, 1L, 1L, TimeUnit.MINUTES);
        this.workerStatus = workerStatusTracker;
        this.doneFuture = kafkaFutureImpl;
        this.executor.submit(new Prepare());
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ConsumeBenchWorker is not running.");
        }
        log.info("{}: Deactivating ConsumeBenchWorker.", this.id);
        this.doneFuture.complete("");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        this.consumer.close();
        this.consumer = null;
        this.executor = null;
        this.statusUpdater = null;
        this.statusUpdaterFuture = null;
        this.workerStatus = null;
        this.doneFuture = null;
    }
}
