package org.apache.kafka.tools;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.1.5-rc-202105051501.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest.class */
public class ClientCompatibilityTest {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClientCompatibilityTest.class);
    private final TestConfig testConfig;
    private final byte[] message1;
    private final byte[] message2;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$ClientCompatibilityTestDeserializer.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.1.5-rc-202105051501.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$ClientCompatibilityTestDeserializer.class */
    public static class ClientCompatibilityTestDeserializer implements Deserializer<byte[]>, ClusterResourceListener {
        private final boolean expectClusterId;

        ClientCompatibilityTestDeserializer(boolean z) {
            this.expectClusterId = z;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.common.serialization.Deserializer
        public byte[] deserialize(String str, byte[] bArr) {
            return bArr;
        }

        @Override // org.apache.kafka.common.ClusterResourceListener
        public void onUpdate(ClusterResource clusterResource) {
            if (this.expectClusterId) {
                if (clusterResource.clusterId() == null) {
                    throw new RuntimeException("Expected cluster id to be supported, but it was null.");
                }
            } else if (clusterResource.clusterId() != null) {
                throw new RuntimeException("Expected cluster id to be null, but it was supported.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$Invoker.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.1.5-rc-202105051501.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$Invoker.class */
    public interface Invoker {
        void invoke() throws Throwable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$OffsetsForTime.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.1.5-rc-202105051501.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$OffsetsForTime.class */
    public static class OffsetsForTime {
        Map<TopicPartition, OffsetAndTimestamp> result;

        private OffsetsForTime() {
        }

        public String toString() {
            return this.result.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$ResultTester.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.1.5-rc-202105051501.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$ResultTester.class */
    public interface ResultTester {
        void test() throws Throwable;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$TestConfig.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.1.5-rc-202105051501.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/tools/ClientCompatibilityTest$TestConfig.class */
    public static class TestConfig {
        final String bootstrapServer;
        final String topic;
        final boolean offsetsForTimesSupported;
        final boolean expectClusterId;
        final boolean expectRecordTooLargeException;
        final int numClusterNodes;
        final boolean createTopicsSupported;
        final boolean describeAclsSupported;

        TestConfig(Namespace namespace) {
            this.bootstrapServer = namespace.getString("bootstrapServer");
            this.topic = namespace.getString("topic");
            this.offsetsForTimesSupported = namespace.getBoolean("offsetsForTimesSupported").booleanValue();
            this.expectClusterId = namespace.getBoolean("clusterIdSupported").booleanValue();
            this.expectRecordTooLargeException = namespace.getBoolean("expectRecordTooLargeException").booleanValue();
            this.numClusterNodes = namespace.getInt("numClusterNodes").intValue();
            this.createTopicsSupported = namespace.getBoolean("createTopicsSupported").booleanValue();
            this.describeAclsSupported = namespace.getBoolean("describeAclsSupported").booleanValue();
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentParser description = ArgumentParsers.newArgumentParser("client-compatibility-test").defaultHelp(true).description("This tool is used to verify client compatibility guarantees.");
        description.addArgument("--topic").action(Arguments.store()).required(true).type(String.class).dest("topic").metavar("TOPIC").help("the compatibility test will produce messages to this topic");
        description.addArgument("--bootstrap-server").action(Arguments.store()).required(true).type(String.class).dest("bootstrapServer").metavar("BOOTSTRAP_SERVER").help("The server(s) to use for bootstrapping");
        description.addArgument("--offsets-for-times-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("offsetsForTimesSupported").metavar("OFFSETS_FOR_TIMES_SUPPORTED").help("True if KafkaConsumer#offsetsForTimes is supported by the current broker version");
        description.addArgument("--cluster-id-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("clusterIdSupported").metavar("CLUSTER_ID_SUPPORTED").help("True if cluster IDs are supported.  False if cluster ID always appears as null.");
        description.addArgument("--expect-record-too-large-exception").action(Arguments.store()).required(true).type(Boolean.class).dest("expectRecordTooLargeException").metavar("EXPECT_RECORD_TOO_LARGE_EXCEPTION").help("True if we should expect a RecordTooLargeException when trying to read from a topic that contains a message that is bigger than max.partition.fetch.bytes.  This is pre-KIP-74 behavior.");
        description.addArgument("--num-cluster-nodes").action(Arguments.store()).required(true).type(Integer.class).dest("numClusterNodes").metavar("NUM_CLUSTER_NODES").help("The number of cluster nodes we should expect to see from the AdminClient.");
        description.addArgument("--create-topics-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("createTopicsSupported").metavar("CREATE_TOPICS_SUPPORTED").help("Whether we should be able to create topics via the AdminClient.");
        description.addArgument("--describe-acls-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("describeAclsSupported").metavar("DESCRIBE_ACLS_SUPPORTED").help("Whether describeAcls is supported in the AdminClient.");
        Namespace namespace = null;
        try {
            namespace = description.parseArgs(strArr);
        } catch (ArgumentParserException e) {
            if (strArr.length == 0) {
                description.printHelp();
                Exit.exit(0);
            } else {
                description.handleError(e);
                Exit.exit(1);
            }
        }
        try {
            new ClientCompatibilityTest(new TestConfig(namespace)).run();
        } catch (Throwable th) {
            System.out.printf("FAILED: Caught exception %s%n%n", th.getMessage());
            th.printStackTrace();
            Exit.exit(1);
        }
        System.out.println("SUCCESS.");
        Exit.exit(0);
    }

    private static String toHexString(byte[] bArr) {
        StringBuilder sb = new StringBuilder();
        for (byte b : bArr) {
            sb.append(String.format("%02x", Byte.valueOf(b)));
        }
        return sb.toString();
    }

    private static void compareArrays(byte[] bArr, byte[] bArr2) {
        if (!Arrays.equals(bArr, bArr2)) {
            throw new RuntimeException("Arrays did not match: expected " + toHexString(bArr) + ", got " + toHexString(bArr2));
        }
    }

    ClientCompatibilityTest(TestConfig testConfig) {
        this.testConfig = testConfig;
        long milliseconds = Time.SYSTEM.milliseconds();
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(milliseconds);
        this.message1 = allocate.array();
        ByteBuffer allocate2 = ByteBuffer.allocate(4096);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= allocate2.capacity()) {
                this.message2 = allocate2.array();
                return;
            } else {
                allocate2.putLong(milliseconds + j2);
                j = j2 + 8;
            }
        }
    }

    void run() throws Throwable {
        long milliseconds = Time.SYSTEM.milliseconds();
        testAdminClient();
        testProduce();
        testConsume(milliseconds);
    }

    public void testProduce() throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.testConfig.bootstrapServer);
        ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
        KafkaProducer kafkaProducer = new KafkaProducer(properties, (Serializer) byteArraySerializer, (Serializer) byteArraySerializer);
        Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord(this.testConfig.topic, this.message1));
        Future<RecordMetadata> send2 = kafkaProducer.send(new ProducerRecord(this.testConfig.topic, this.message2));
        kafkaProducer.flush();
        send.get();
        send2.get();
        kafkaProducer.close();
    }

    void testAdminClient() throws Throwable {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.testConfig.bootstrapServer);
        AdminClient create = AdminClient.create(properties);
        Throwable th = null;
        while (true) {
            try {
                Collection<Node> collection = create.describeCluster().nodes().get();
                if (collection.size() == this.testConfig.numClusterNodes) {
                    tryFeature("createTopics", this.testConfig.createTopicsSupported, () -> {
                        try {
                            create.createTopics(Collections.singleton(new NewTopic("newtopic", 1, (short) 1))).all().get();
                        } catch (ExecutionException e) {
                            throw e.getCause();
                        }
                    }, () -> {
                        createTopicsResultTest(create, Collections.singleton("newtopic"));
                    });
                    while (true) {
                        Collection<TopicListing> collection2 = create.listTopics().listings().get();
                        if (this.testConfig.createTopicsSupported && !topicExists(collection2, "newtopic")) {
                            Thread.sleep(1L);
                            log.info("Did not see newtopic.  Retrying listTopics...");
                        }
                    }
                    tryFeature("describeAclsSupported", this.testConfig.describeAclsSupported, () -> {
                        try {
                            create.describeAcls(AclBindingFilter.ANY).values().get();
                        } catch (ExecutionException e) {
                            if (!(e.getCause() instanceof SecurityDisabledException)) {
                                throw e.getCause();
                            }
                        }
                    });
                    if (create != null) {
                        if (0 == 0) {
                            create.close();
                            return;
                        }
                        try {
                            create.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                if (collection.size() > this.testConfig.numClusterNodes) {
                    throw new KafkaException("Expected to see " + this.testConfig.numClusterNodes + " nodes, but saw " + collection.size());
                }
                Thread.sleep(1L);
                log.info("Saw only {} cluster nodes.  Waiting to see {}.", Integer.valueOf(collection.size()), Integer.valueOf(this.testConfig.numClusterNodes));
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }
    }

    private void createTopicsResultTest(AdminClient adminClient, Collection<String> collection) throws InterruptedException, ExecutionException {
        do {
            try {
                adminClient.describeTopics(collection).all().get();
                return;
            } catch (ExecutionException e) {
            }
        } while (e.getCause() instanceof UnknownTopicOrPartitionException);
        throw e;
    }

    private boolean topicExists(Collection<TopicListing> collection, String str) {
        boolean z = false;
        for (TopicListing topicListing : collection) {
            if (topicListing.name().equals(str)) {
                if (topicListing.isInternal()) {
                    throw new KafkaException(String.format("Did not expect %s to be an internal topic.", str));
                }
                z = true;
            }
        }
        return z;
    }

    public void testConsume(final long j) throws Throwable {
        byte[] next;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.testConfig.bootstrapServer);
        properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512);
        ClientCompatibilityTestDeserializer clientCompatibilityTestDeserializer = new ClientCompatibilityTestDeserializer(this.testConfig.expectClusterId);
        final KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, (Deserializer) clientCompatibilityTestDeserializer, (Deserializer) clientCompatibilityTestDeserializer);
        Throwable th = null;
        try {
            List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(this.testConfig.topic);
            if (partitionsFor.size() < 1) {
                throw new RuntimeException("Expected at least one partition for topic " + this.testConfig.topic);
            }
            HashMap hashMap = new HashMap();
            LinkedList linkedList = new LinkedList();
            for (PartitionInfo partitionInfo : partitionsFor) {
                TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                hashMap.put(topicPartition, Long.valueOf(j));
                linkedList.add(topicPartition);
            }
            OffsetsForTime offsetsForTime = new OffsetsForTime();
            tryFeature("offsetsForTimes", this.testConfig.offsetsForTimesSupported, () -> {
                offsetsForTime.result = kafkaConsumer.offsetsForTimes(hashMap);
            }, () -> {
                log.info("offsetsForTime = {}", offsetsForTime.result);
            });
            kafkaConsumer.beginningOffsets(hashMap.keySet());
            kafkaConsumer.endOffsets(hashMap.keySet());
            kafkaConsumer.assign(linkedList);
            kafkaConsumer.seekToBeginning(linkedList);
            Iterator<byte[]> it = new Iterator<byte[]>() { // from class: org.apache.kafka.tools.ClientCompatibilityTest.1
                private static final int TIMEOUT_MS = 10000;
                private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
                private byte[] next = null;

                private byte[] fetchNext() {
                    while (Time.SYSTEM.milliseconds() - j <= 10000) {
                        if (this.recordIter == null) {
                            this.recordIter = kafkaConsumer.poll(Duration.ofMillis(100L)).iterator();
                        }
                        if (this.recordIter.hasNext()) {
                            return this.recordIter.next().value();
                        }
                        this.recordIter = null;
                    }
                    throw new RuntimeException("Timed out after 10000 ms.");
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    if (this.next != null) {
                        return true;
                    }
                    this.next = fetchNext();
                    return this.next != null;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public byte[] next() {
                    if (!hasNext()) {
                        throw new NoSuchElementException();
                    }
                    byte[] bArr = this.next;
                    this.next = null;
                    return bArr;
                }

                @Override // java.util.Iterator
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            };
            try {
                compareArrays(this.message1, it.next());
                log.debug("Found first message...");
                try {
                    next = it.next();
                } catch (RecordTooLargeException e) {
                    log.debug("Got RecordTooLargeException", (Throwable) e);
                    if (!this.testConfig.expectRecordTooLargeException) {
                        throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record bigger than max.partition.fetch.bytes");
                    }
                }
                if (this.testConfig.expectRecordTooLargeException) {
                    throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record bigger than max.partition.fetch.bytes");
                }
                try {
                    compareArrays(this.message2, next);
                } catch (RuntimeException e2) {
                    System.out.println("The second message in this topic was not ours. Please use a new topic when running this program.");
                    Exit.exit(1);
                }
                log.debug("Closing consumer.");
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                log.info("Closed consumer.");
            } catch (RuntimeException e3) {
                throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when running this program.");
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private void tryFeature(String str, boolean z, Invoker invoker) throws Throwable {
        tryFeature(str, z, invoker, () -> {
        });
    }

    private void tryFeature(String str, boolean z, Invoker invoker, ResultTester resultTester) throws Throwable {
        try {
            invoker.invoke();
            log.info("Successfully used feature {}", str);
            if (!z) {
                throw new RuntimeException("Did not expect " + str + " to be supported, but it was.");
            }
            resultTester.test();
        } catch (UnsupportedVersionException e) {
            log.info("Got UnsupportedVersionException when attempting to use feature {}", str);
            if (z) {
                throw new RuntimeException("Expected " + str + " to be supported, but it wasn't.", e);
            }
        }
    }
}
