package io.confluent.parallelconsumer.reactor;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.truth.LongPollingMockConsumerSubject;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/confluent/parallelconsumer/reactor/ReactorPCTest.class */
class ReactorPCTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReactorPCTest.class);
    public static final int MAX_CONCURRENCY = 1000;
    ReactorProcessor<String, String> rp;

    ReactorPCTest() {
    }

    protected AbstractParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) {
        this.rp = new ReactorProcessor<>(parallelConsumerOptions.toBuilder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC).maxConcurrency(MAX_CONCURRENCY).build());
        return this.rp;
    }

    @BeforeEach
    public void setupData() {
        super.primeFirstRecord();
    }

    @Test
    void kickTires() {
        primeFirstRecord();
        primeFirstRecord();
        primeFirstRecord();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        this.rp.react(consumerRecord -> {
            log.info("Reactor user poll function: {}", consumerRecord);
            concurrentLinkedQueue.add(consumerRecord);
            return Mono.just(StringUtils.msg("result: {}:{}", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.value()}));
        });
        Awaitility.await().atMost(Duration.ofSeconds(1L)).untilAsserted(() -> {
            Truth.assertWithMessage("Processed records collection so far").that(Integer.valueOf(concurrentLinkedQueue.size())).isEqualTo(4);
            LongPollingMockConsumerSubject.assertThat(this.consumerSpy).hasCommittedToPartition(this.topicPartition).atLeastOffset(4);
        });
    }

    private static Flux<String> fromPath(Path path) {
        return Flux.using(() -> {
            return Files.lines(path);
        }, Flux::fromStream, (v0) -> {
            v0.close();
        });
    }

    @Test
    void concurrencyTest() {
        int i = 100000;
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(100000 - 1));
        log.info("Finished priming records");
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, 100000);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        int i2 = MAX_CONCURRENCY;
        this.rp.react(consumerRecord -> {
            return Mono.just(StringUtils.msg("result: {}:{}", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.value()})).doOnNext(str -> {
                log.trace("Reactor user function executing: {}", consumerRecord);
                concurrentLinkedQueue.add(consumerRecord);
                if (concurrentLinkedQueue.size() > i2) {
                    log.error("More records submitted for processing than max concurrency settings ({} vs {})", Integer.valueOf(concurrentLinkedQueue.size()), Integer.valueOf(i2));
                    countDownLatch.countDown();
                }
            }).delayElement(Duration.ofMillis((int) (100.0d * Math.random()))).doOnNext(str2 -> {
                log.trace("User function after delay. Records pending: {}, removing from out for processing: {}", Integer.valueOf(concurrentLinkedQueue.size()), consumerRecord);
                atomicInteger2.set(Math.max(concurrentLinkedQueue.size(), atomicInteger2.get()));
                Truth.assertWithMessage("record was present and removed").that(Boolean.valueOf(concurrentLinkedQueue.remove(consumerRecord))).isTrue();
                if (atomicInteger.incrementAndGet() > i - 1) {
                    countDownLatch.countDown();
                }
                newMessagesBar.step();
            });
        });
        LatchTestUtils.awaitLatch(countDownLatch, 450);
        Truth.assertWithMessage("Max concurrency should never be exceeded").that(Integer.valueOf(atomicInteger2.get())).isLessThan(Integer.valueOf(MAX_CONCURRENCY));
        log.info("Max concurrency was {}", Integer.valueOf(atomicInteger2.get()));
        Awaitility.await().atMost(Duration.ofSeconds(2L)).failFast("Max concurrency exceeded", () -> {
            return Boolean.valueOf(concurrentLinkedQueue.size() > i2);
        }).untilAsserted(() -> {
            Truth.assertWithMessage("Number of completed messages").that(Integer.valueOf(atomicInteger.get())).isEqualTo(Integer.valueOf(i));
            LongPollingMockConsumerSubject.assertThat(this.consumerSpy).hasCommittedToPartition(this.topicPartition).offset(i);
        });
        newMessagesBar.close();
    }
}
