package io.confluent.parallelconsumer.reactor;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.truth.LongPollingMockConsumerSubject;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/confluent/parallelconsumer/reactor/ReactorPCTest.class */
class ReactorPCTest extends ReactorUnitTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReactorPCTest.class);

    ReactorPCTest() {
    }

    @BeforeEach
    public void setupData() {
        super.primeFirstRecord();
    }

    @Test
    void kickTires() {
        primeFirstRecord();
        primeFirstRecord();
        primeFirstRecord();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        this.reactorPC.react(pollContext -> {
            log.info("Reactor user poll function: {}", pollContext);
            concurrentLinkedQueue.add(pollContext);
            return Mono.just(StringUtils.msg("result: {}:{}", new Object[]{Long.valueOf(pollContext.offset()), pollContext.value()}));
        });
        Awaitility.await().atMost(defaultTimeout).untilAsserted(() -> {
            Truth.assertWithMessage("Processed records collection so far").that(Integer.valueOf(concurrentLinkedQueue.size())).isEqualTo(4);
            LongPollingMockConsumerSubject.assertThat(this.consumerSpy).hasCommittedToPartition(this.topicPartition).atLeastOffset(4L);
        });
    }

    @Test
    void concurrencyTest() {
        int i = 100000;
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(100000 - 1));
        log.info("Finished priming records");
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, 100000);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        int i2 = 1000;
        this.reactorPC.react(pollContext -> {
            return Mono.just(StringUtils.msg("result: {}:{}", new Object[]{Long.valueOf(pollContext.offset()), pollContext.value()})).doOnNext(str -> {
                log.trace("Reactor user function executing: {}", pollContext);
                concurrentLinkedQueue.add(pollContext);
                if (concurrentLinkedQueue.size() > i2) {
                    log.error("More records submitted for processing than max concurrency settings ({} vs {})", Integer.valueOf(concurrentLinkedQueue.size()), Integer.valueOf(i2));
                    countDownLatch.countDown();
                }
            }).delayElement(Duration.ofMillis((int) (100.0d * Math.random()))).doOnNext(str2 -> {
                log.trace("User function after delay. Records pending: {}, removing from out for processing: {}", Integer.valueOf(concurrentLinkedQueue.size()), pollContext);
                atomicInteger2.set(Math.max(concurrentLinkedQueue.size(), atomicInteger2.get()));
                Truth.assertWithMessage("record was present and removed").that(Boolean.valueOf(concurrentLinkedQueue.remove(pollContext))).isTrue();
                if (atomicInteger.incrementAndGet() > i - 1) {
                    countDownLatch.countDown();
                }
                newMessagesBar.step();
            });
        });
        LatchTestUtils.awaitLatch(countDownLatch, defaultTimeoutSeconds);
        int i3 = (int) (1000 * 1.1d);
        Truth.assertWithMessage("Max concurrency should never be exceeded").that(Integer.valueOf(atomicInteger2.get())).isLessThan(Integer.valueOf(i3));
        log.info("Max concurrency was {}", Integer.valueOf(atomicInteger2.get()));
        Awaitility.await().atMost(defaultTimeout).failFast("Max concurrency exceeded", () -> {
            return Boolean.valueOf(concurrentLinkedQueue.size() > i3);
        }).untilAsserted(() -> {
            Truth.assertWithMessage("Number of completed messages").that(Integer.valueOf(atomicInteger.get())).isEqualTo(Integer.valueOf(i));
            LongPollingMockConsumerSubject.assertThat(this.consumerSpy).hasCommittedToPartition(this.topicPartition).offset(i);
        });
        newMessagesBar.close();
        log.info("Max concurrency was {}", Integer.valueOf(atomicInteger2.get()));
    }
}
