package org.apache.kafka.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.pulsar.kafka.shade.org.tukaani.xz.common.Util;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.5.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/TransactionalMessageCopier.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/TransactionalMessageCopier.class */
public class TransactionalMessageCopier {
    private static ArgumentParser argParser() {
        ArgumentParser description = ArgumentParsers.newArgumentParser("transactional-message-copier").defaultHelp(true).description("This tool copies messages transactionally from an input partition to an output topic, committing the consumed offsets along with the output messages");
        description.addArgument("--input-topic").action(Arguments.store()).required(true).type(String.class).metavar("INPUT-TOPIC").dest("inputTopic").help("Consume messages from this topic");
        description.addArgument("--input-partition").action(Arguments.store()).required(true).type(Integer.class).metavar("INPUT-PARTITION").dest("inputPartition").help("Consume messages from this partition of the input topic.");
        description.addArgument("--output-topic").action(Arguments.store()).required(true).type(String.class).metavar("OUTPUT-TOPIC").dest("outputTopic").help("Produce messages to this topic");
        description.addArgument("--broker-list").action(Arguments.store()).required(true).type(String.class).metavar("HOST1:PORT1[,HOST2:PORT2[...]]").dest("brokerList").help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        description.addArgument("--max-messages").action(Arguments.store()).required(false).setDefault((Object) (-1)).type(Integer.class).metavar("MAX-MESSAGES").dest("maxMessages").help("Process these many messages upto the end offset at the time this program was launched. If set to -1 we will just read to the end offset of the input partition (as of the time the program was launched).");
        description.addArgument("--consumer-group").action(Arguments.store()).required(false).setDefault((Object) (-1)).type(String.class).metavar("CONSUMER-GROUP").dest("consumerGroup").help("The consumer group id to use for storing the consumer offsets.");
        description.addArgument("--transaction-size").action(Arguments.store()).required(false).setDefault((Object) 200).type(Integer.class).metavar("TRANSACTION-SIZE").dest("messagesPerTransaction").help("The number of messages to put in each transaction. Default is 200.");
        description.addArgument("--transactional-id").action(Arguments.store()).required(true).type(String.class).metavar("TRANSACTIONAL-ID").dest("transactionalId").help("The transactionalId to assign to the producer");
        description.addArgument("--enable-random-aborts").action(Arguments.storeTrue()).type(Boolean.class).metavar("ENABLE-RANDOM-ABORTS").dest("enableRandomAborts").help("Whether or not to enable random transaction aborts (for system testing)");
        return description;
    }

    private static KafkaProducer<String, String> createProducer(Namespace namespace) {
        String string = namespace.getString("transactionalId");
        String string2 = namespace.getString("brokerList");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", string2);
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, string);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
        properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
        return new KafkaProducer<>(properties);
    }

    private static KafkaConsumer<String, String> createConsumer(Namespace namespace) {
        String string = namespace.getString("consumerGroup");
        String string2 = namespace.getString("brokerList");
        Integer num = namespace.getInt("messagesPerTransaction");
        Properties properties = new Properties();
        properties.put("group.id", string);
        properties.put("bootstrap.servers", string2);
        properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, num.toString());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put("session.timeout.ms", "10000");
        properties.put("heartbeat.interval.ms", "3000");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(properties);
    }

    private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String str, ConsumerRecord<String, String> consumerRecord) {
        return new ProducerRecord<>(str, consumerRecord.key(), consumerRecord.value());
    }

    private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> kafkaConsumer) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
            hashMap.put(topicPartition, new OffsetAndMetadata(kafkaConsumer.position(topicPartition), null));
        }
        return hashMap;
    }

    private static void resetToLastCommittedPositions(KafkaConsumer<String, String> kafkaConsumer) {
        for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
            OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
            if (committed != null) {
                kafkaConsumer.seek(topicPartition, committed.offset());
            } else {
                kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        }
    }

    private static long messagesRemaining(KafkaConsumer<String, String> kafkaConsumer, TopicPartition topicPartition) {
        long position = kafkaConsumer.position(topicPartition);
        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(Collections.singleton(topicPartition));
        if (endOffsets.containsKey(topicPartition)) {
            return endOffsets.get(topicPartition).longValue() - position;
        }
        return 0L;
    }

    private static String toJsonString(Map<String, Object> map) {
        String str;
        try {
            str = new ObjectMapper().writeValueAsString(map);
        } catch (JsonProcessingException e) {
            str = "Bad data can't be written as json: " + e.getMessage();
        }
        return str;
    }

    private static String statusAsJson(long j, long j2, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("progress", str);
        hashMap.put("consumed", Long.valueOf(j));
        hashMap.put("remaining", Long.valueOf(j2));
        return toJsonString(hashMap);
    }

    private static String shutDownString(long j, long j2, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("remaining", Long.valueOf(j2));
        hashMap.put("consumed", Long.valueOf(j));
        hashMap.put("shutdown_complete", str);
        return toJsonString(hashMap);
    }

    public static void main(String[] strArr) throws IOException {
        Namespace parseArgsOrFail = argParser().parseArgsOrFail(strArr);
        Integer num = parseArgsOrFail.getInt("messagesPerTransaction");
        String string = parseArgsOrFail.getString("transactionalId");
        String string2 = parseArgsOrFail.getString("outputTopic");
        String string3 = parseArgsOrFail.getString("consumerGroup");
        TopicPartition topicPartition = new TopicPartition(parseArgsOrFail.getString("inputTopic"), parseArgsOrFail.getInt("inputPartition").intValue());
        KafkaProducer<String, String> createProducer = createProducer(parseArgsOrFail);
        KafkaConsumer<String, String> createConsumer = createConsumer(parseArgsOrFail);
        createConsumer.assign(Collections.singleton(topicPartition));
        long min = Math.min(messagesRemaining(createConsumer, topicPartition), parseArgsOrFail.getInt("maxMessages").intValue() == -1 ? Util.VLI_MAX : parseArgsOrFail.getInt("maxMessages").intValue());
        boolean booleanValue = parseArgsOrFail.getBoolean("enableRandomAborts").booleanValue();
        createProducer.initTransactions();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicLong atomicLong = new AtomicLong(min);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            atomicBoolean.set(true);
            createProducer.close();
            synchronized (createConsumer) {
                createConsumer.close();
            }
            System.out.println(shutDownString(atomicLong2.get(), atomicLong.get(), string));
        }));
        try {
            Random random = new Random();
            while (0 < atomicLong.get()) {
                System.out.println(statusAsJson(atomicLong2.get(), atomicLong.get(), string));
                if (!atomicBoolean.get()) {
                    int i = 0;
                    long min2 = Math.min(num.intValue(), atomicLong.get());
                    try {
                        try {
                            createProducer.beginTransaction();
                            while (i < min2) {
                                Iterator<ConsumerRecord<String, String>> it = createConsumer.poll(Duration.ofMillis(200L)).iterator();
                                while (it.hasNext()) {
                                    createProducer.send(producerRecordFromConsumerRecord(string2, it.next()));
                                    i++;
                                }
                            }
                            createProducer.sendOffsetsToTransaction(consumerPositions(createConsumer), string3);
                        } catch (OutOfOrderSequenceException | ProducerFencedException e) {
                            throw e;
                        }
                    } catch (KafkaException e2) {
                        createProducer.abortTransaction();
                        resetToLastCommittedPositions(createConsumer);
                    }
                    if (booleanValue && random.nextInt() % 3 == 0) {
                        throw new KafkaException("Aborting transaction");
                        break;
                    } else {
                        createProducer.commitTransaction();
                        atomicLong.set(min - atomicLong2.addAndGet(i));
                    }
                } else {
                    break;
                }
            }
            createProducer.close();
            synchronized (createConsumer) {
                createConsumer.close();
            }
            System.exit(0);
        } catch (Throwable th) {
            createProducer.close();
            synchronized (createConsumer) {
                createConsumer.close();
                throw th;
            }
        }
    }
}
