package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.ProgressTracker;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.internal.StandardComparisonStrategy;
import org.assertj.core.util.IterableUtil;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.class */
public class MultiInstanceRebalanceTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceRebalanceTest.class);
    static final int DEFAULT_MAX_POLL = 500;
    List<String> consumedKeys = Collections.synchronizedList(new ArrayList());
    AtomicInteger count = new AtomicInteger();
    int instanceId = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest$ParallelConsumerRunnable.class */
    public class ParallelConsumerRunnable implements Runnable {
        private final int maxPoll;
        private final ParallelConsumerOptions.CommitMode commitMode;
        private final ParallelConsumerOptions.ProcessingOrder order;
        private final String inputTopic;
        private final ProgressBar bar;
        private ParallelEoSStreamProcessor<String, String> parallelConsumer;

        @Override // java.lang.Runnable
        public void run() {
            MultiInstanceRebalanceTest.log.info("Running consumer!");
            Properties properties = new Properties();
            properties.put("max.poll.records", Integer.valueOf(this.maxPoll));
            this.parallelConsumer = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder().ordering(this.order).consumer(MultiInstanceRebalanceTest.this.kcu.createNewConsumer(false, properties)).commitMode(this.commitMode).maxConcurrency(1).build());
            this.parallelConsumer.setMyId(Optional.of("id: " + MultiInstanceRebalanceTest.this.instanceId));
            MultiInstanceRebalanceTest.this.instanceId++;
            this.parallelConsumer.subscribe(UniLists.of(this.inputTopic));
            this.parallelConsumer.poll(consumerRecord -> {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                MultiInstanceRebalanceTest.this.count.incrementAndGet();
                this.bar.stepBy(1L);
                MultiInstanceRebalanceTest.this.consumedKeys.add((String) consumerRecord.key());
            });
        }

        public int getMaxPoll() {
            return this.maxPoll;
        }

        public ParallelConsumerOptions.CommitMode getCommitMode() {
            return this.commitMode;
        }

        public ParallelConsumerOptions.ProcessingOrder getOrder() {
            return this.order;
        }

        public String getInputTopic() {
            return this.inputTopic;
        }

        public ProgressBar getBar() {
            return this.bar;
        }

        public ParallelEoSStreamProcessor<String, String> getParallelConsumer() {
            return this.parallelConsumer;
        }

        public ParallelConsumerRunnable(int i, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder, String str, ProgressBar progressBar) {
            this.maxPoll = i;
            this.commitMode = commitMode;
            this.order = processingOrder;
            this.inputTopic = str;
            this.bar = progressBar;
        }
    }

    @EnumSource(ParallelConsumerOptions.ProcessingOrder.class)
    @ParameterizedTest
    void consumeWithMultipleInstancesPeriodicConsumerSync(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        runTest(DEFAULT_MAX_POLL, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC, processingOrder);
    }

    @EnumSource(ParallelConsumerOptions.ProcessingOrder.class)
    @ParameterizedTest
    void consumeWithMultipleInstancesPeriodicConsumerAsync(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        runTest(DEFAULT_MAX_POLL, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, processingOrder);
    }

    private void runTest(int i, ParallelConsumerOptions.CommitMode commitMode, ParallelConsumerOptions.ProcessingOrder processingOrder) {
        this.numPartitions = 2;
        String str = setupTopic(getClass().getSimpleName() + "-input-" + RandomUtils.nextInt());
        ArrayList arrayList = new ArrayList();
        log.info("Producing {} messages before starting test", 100);
        ArrayList arrayList2 = new ArrayList();
        KafkaProducer createNewProducer = this.kcu.createNewProducer(false);
        for (int i2 = 0; i2 < 100; i2++) {
            try {
                String str2 = "key-" + i2;
                arrayList2.add(createNewProducer.send(new ProducerRecord(str, str2, "value-" + i2), (recordMetadata, exc) -> {
                    if (exc != null) {
                        log.error("Error sending, ", exc);
                    }
                }));
                arrayList.add(str2);
            } finally {
            }
        }
        log.debug("Finished sending test data");
        if (createNewProducer != null) {
            createNewProducer.close();
        }
        log.debug("Waiting for broker acks");
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Assertions.assertThat(arrayList2).hasSize(100);
        log.debug("Starting test");
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, 100);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        log.info("Running first instance of pc");
        ParallelConsumerRunnable parallelConsumerRunnable = new ParallelConsumerRunnable(i, commitMode, processingOrder, str, newMessagesBar);
        newFixedThreadPool.submit(parallelConsumerRunnable);
        Awaitility.waitAtMost(Duration.ofSeconds(10L)).until(() -> {
            return Boolean.valueOf(this.consumedKeys.size() > 10);
        });
        log.info("Running second instance of pc");
        ParallelConsumerRunnable parallelConsumerRunnable2 = new ParallelConsumerRunnable(i, commitMode, processingOrder, str, newMessagesBar);
        newFixedThreadPool.submit(parallelConsumerRunnable2);
        Assertions.useRepresentation(new TrimListRepresentation());
        String msg = StringUtils.msg("All keys sent to input-topic should be processed, within time (expected: {} commit: {} order: {} max poll: {})", new Object[]{100, commitMode, processingOrder, Integer.valueOf(i)});
        ProgressTracker progressTracker = new ProgressTracker(this.count);
        try {
            Awaitility.waitAtMost(Duration.ofSeconds(30L)).alias(msg).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                log.trace("Processed-count: {}", Integer.valueOf(this.consumedKeys.size()));
                if (progressTracker.hasProgressNotBeenMade()) {
                    arrayList.removeAll(this.consumedKeys);
                    throw progressTracker.constructError(StringUtils.msg("No progress, missing keys: {}.", new Object[]{arrayList}));
                }
                SoftAssertions softAssertions = new SoftAssertions();
                softAssertions.assertThat(new ArrayList(this.consumedKeys)).as("all expected are consumed", new Object[0]).containsAll(arrayList);
                softAssertions.assertThat(new ArrayList(this.consumedKeys)).as("all expected are consumed only once", new Object[0]).hasSizeGreaterThanOrEqualTo(arrayList.size());
                softAssertions.assertAll();
            });
        } catch (ConditionTimeoutException e) {
            Assertions.fail(msg + "\n" + e.getMessage());
        }
        newMessagesBar.close();
        parallelConsumerRunnable.getParallelConsumer().closeDrainFirst();
        parallelConsumerRunnable2.getParallelConsumer().closeDrainFirst();
        newFixedThreadPool.shutdown();
        log.info("Duplicate consumed keys (at least one is expected due to the rebalance): {}", IterableUtil.toCollection(StandardComparisonStrategy.instance().duplicatesFrom(this.consumedKeys)));
    }
}
