package io.axual.platform.test.core;

import io.axual.common.exception.ClientException;
import io.axual.common.resolver.TopicPatternResolver;
import io.axual.common.resolver.TopicResolver;
import io.axual.common.tools.ExecutorUtil;
import io.axual.common.tools.KafkaUtil;
import io.axual.common.tools.SleepUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/axual-platform-test-core-6.0.0.jar:io/axual/platform/test/core/DistributionUnit.class */
public class DistributionUnit {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DistributionUnit.class);
    private static final String COPY_FLAG_HEADER = "DistributionUnitCopyFlag";
    private final List<ClusterUnit> clusters;
    private final Map<ClusterUnit, Producer<byte[], byte[]>> producers = new HashMap();
    private final List<Future<?>> futures = new ArrayList();
    private final Map<String, Object> context = new HashMap();
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private boolean stop = false;
    private boolean paused = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/axual-platform-test-core-6.0.0.jar:io/axual/platform/test/core/DistributionUnit$DistributionTask.class */
    public class DistributionTask implements Runnable {
        private final ClusterUnit sourceCluster;
        private Set<ClusterUnit> targetClusters;
        private final String topicRegex;
        private final Map<ClusterUnit, TopicResolver> targetResolvers = new HashMap();
        private final TopicResolver sourceResolver = new TopicPatternResolver();

        DistributionTask(ClusterUnit clusterUnit, Set<ClusterUnit> set) {
            this.sourceCluster = clusterUnit;
            this.targetClusters = set;
            HashMap hashMap = new HashMap(DistributionUnit.this.context);
            hashMap.put(TopicPatternResolver.TOPIC_PATTERN_CONFIG, clusterUnit.getTopicPattern());
            hashMap.put("environment", JmxReporter.DEFAULT_INCLUDE);
            this.sourceResolver.configure(hashMap);
            this.topicRegex = this.sourceResolver.resolveTopic(JmxReporter.DEFAULT_INCLUDE);
            hashMap.remove("environment");
            this.sourceResolver.configure(hashMap);
            Iterator<ClusterUnit> it = set.iterator();
            while (it.hasNext()) {
                this.targetResolvers.put(it.next(), new TopicPatternResolver());
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Map<String, Object> kafkaConfigs = KafkaUtil.getKafkaConfigs(SslUnit.getDefaultSslConfig());
                kafkaConfigs.put("bootstrap.servers", this.sourceCluster.getBootstrapServer());
                kafkaConfigs.put("group.id", "distributor-unit-" + UUID.randomUUID().toString());
                kafkaConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
                kafkaConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
                kafkaConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaConfigs);
                Throwable th = null;
                try {
                    try {
                        kafkaConsumer.subscribe(Pattern.compile(this.topicRegex), new ConsumerRebalanceListener() { // from class: io.axual.platform.test.core.DistributionUnit.DistributionTask.1
                            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                                DistributionUnit.LOG.info("Partitions revoked on Cluster {}:", DistributionTask.this.sourceCluster.getName());
                                for (TopicPartition topicPartition : collection) {
                                    DistributionUnit.LOG.info("  {} / {} ", topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
                                }
                            }

                            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                                DistributionUnit.LOG.info("Partitions assigned on Cluster {}:", DistributionTask.this.sourceCluster.getName());
                                for (TopicPartition topicPartition : collection) {
                                    DistributionUnit.LOG.info("  {} / {} ", topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
                                }
                            }
                        });
                        while (!DistributionUnit.this.stop) {
                            if (DistributionUnit.this.paused) {
                                SleepUtil.sleepInterruptibly(Duration.ofMillis(100L), () -> {
                                    return DistributionUnit.this.stop;
                                });
                            } else {
                                pollOnce(kafkaConsumer);
                            }
                        }
                        if (kafkaConsumer != null) {
                            if (0 != 0) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                kafkaConsumer.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (kafkaConsumer != null) {
                        if (th != null) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    throw th4;
                }
            } catch (InterruptException e) {
                DistributionUnit.LOG.info("Distribution interrupted");
            } catch (Exception e2) {
                throw new ClientException("Distribution task exited with exception", e2);
            }
        }

        private void pollOnce(Consumer<byte[], byte[]> consumer) {
            Iterator<ConsumerRecord<byte[], byte[]>> it = consumer.poll(Duration.ofMillis(100L)).iterator();
            while (it.hasNext()) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                if (next.headers() != null) {
                    if (next.headers().lastHeader(DistributionUnit.COPY_FLAG_HEADER) == null) {
                        distributeMessage(next);
                    } else if (DistributionUnit.LOG.isDebugEnabled()) {
                        DistributionUnit.LOG.debug("Skipping message from {} (partition {}, offset {}) on {}", next.topic(), Integer.valueOf(next.partition()), Long.valueOf(next.offset()), this.sourceCluster.getName());
                    }
                }
            }
        }

        private void distributeMessage(ConsumerRecord<byte[], byte[]> consumerRecord) {
            HashMap hashMap = new HashMap(this.sourceResolver.unresolveContext(consumerRecord.topic()));
            String unresolveTopic = this.sourceResolver.unresolveTopic(consumerRecord.topic());
            for (ClusterUnit clusterUnit : this.targetClusters) {
                consumerRecord.headers().add(new RecordHeader(DistributionUnit.COPY_FLAG_HEADER, new byte[0]));
                TopicResolver topicResolver = this.targetResolvers.get(clusterUnit);
                hashMap.put(TopicPatternResolver.TOPIC_PATTERN_CONFIG, clusterUnit.getTopicPattern());
                topicResolver.configure(hashMap);
                ((Producer) DistributionUnit.this.producers.get(clusterUnit)).send(new ProducerRecord(topicResolver.resolveTopic(unresolveTopic), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.timestamp()), consumerRecord.key(), consumerRecord.value(), consumerRecord.headers()));
                if (DistributionUnit.LOG.isInfoEnabled()) {
                    DistributionUnit.LOG.info("Copied message from {} (partition {}, offset {}) on {} to {}", unresolveTopic, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), this.sourceCluster.getName(), clusterUnit.getName());
                }
            }
        }
    }

    public DistributionUnit(List<ClusterUnit> list, Map<String, Object> map) {
        this.clusters = list;
        this.context.putAll(map);
    }

    public void start() {
        for (ClusterUnit clusterUnit : this.clusters) {
            Map<String, Object> kafkaConfigs = KafkaUtil.getKafkaConfigs(SslUnit.getDefaultSslConfig());
            kafkaConfigs.put("bootstrap.servers", clusterUnit.getBootstrapServer());
            kafkaConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
            kafkaConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
            this.producers.put(clusterUnit, new KafkaProducer(kafkaConfigs));
            HashSet hashSet = new HashSet(this.clusters);
            hashSet.remove(clusterUnit);
            this.futures.add(this.executor.submit(new DistributionTask(clusterUnit, hashSet)));
        }
    }

    public void stop() {
        this.stop = true;
        Iterator<Map.Entry<ClusterUnit, Producer<byte[], byte[]>>> it = this.producers.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().close();
        }
        this.producers.clear();
        for (Future<?> future : this.futures) {
            while (!future.isDone()) {
                future.cancel(false);
            }
        }
        ExecutorUtil.terminateExecutor(this.executor, Duration.ofSeconds(10L));
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
    }
}
