package io.confluent.parallelconsumer.vertx.integrationTests;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.Request;
import com.github.tomakehurst.wiremock.http.RequestListener;
import com.github.tomakehurst.wiremock.http.Response;
import com.google.common.flogger.FluentLogger;
import com.google.common.truth.Truth;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.csid.utils.TrimListRepresentation;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

@Testcontainers
@Isolated
/* loaded from: input_file:io/confluent/parallelconsumer/vertx/integrationTests/VertxConcurrencyIT.class */
class VertxConcurrencyIT extends BrokerIntegrationTest {
    private static ProgressBar bar;
    static final int expectedMessageCount = 2000;
    static final int expectedConcurrentCount = 100;
    public List<String> consumedKeys = Collections.synchronizedList(new ArrayList());
    public AtomicInteger processedCount = new AtomicInteger(0);
    public AtomicInteger httpResponseReceivedCount = new AtomicInteger(0);
    public static WireMockServer stubServer;
    private static final Logger log = LoggerFactory.getLogger(VertxConcurrencyIT.class);
    private static final FluentLogger flog = FluentLogger.forEnclosingClass();
    public static AtomicInteger numberRequestsProcessing = new AtomicInteger(0);
    public static AtomicInteger highestConcurrency = new AtomicInteger(0);
    static CountDownLatch responseLock = new CountDownLatch(1);
    static Queue<Request> requestsReceivedOnServer = new ConcurrentArrayQueue();

    VertxConcurrencyIT() {
        bar = ProgressBarUtils.getNewMessagesBar(log, expectedMessageCount);
    }

    @BeforeAll
    static void setupWireMock() {
        stubServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().containerThreads(200));
        stubServer.stubFor(WireMock.get(WireMock.urlPathEqualTo("/")).willReturn(WireMock.aResponse()));
        stubServer.addMockServiceRequestListener(new RequestListener() { // from class: io.confluent.parallelconsumer.vertx.integrationTests.VertxConcurrencyIT.1
            public void requestReceived(Request request, Response response) {
                VertxConcurrencyIT.log.debug("req: {}", request);
                VertxConcurrencyIT.numberRequestsProcessing.getAndIncrement();
                VertxConcurrencyIT.requestsReceivedOnServer.add(request);
                VertxConcurrencyIT.bar.stepBy(1L);
                LatchTestUtils.awaitLatch(VertxConcurrencyIT.responseLock, 30);
                VertxConcurrencyIT.log.trace("unlocked");
                VertxConcurrencyIT.highestConcurrency.set(Math.max(VertxConcurrencyIT.highestConcurrency.get(), VertxConcurrencyIT.numberRequestsProcessing.get()));
                ThreadUtils.sleepLog(400);
                VertxConcurrencyIT.numberRequestsProcessing.getAndDecrement();
            }
        });
        stubServer.start();
    }

    @AfterAll
    static void close() {
        stubServer.stop();
    }

    @Test
    void testVertxConcurrency() {
        ParallelConsumerOptions.CommitMode commitMode = ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS;
        ParallelConsumerOptions.ProcessingOrder processingOrder = ParallelConsumerOptions.ProcessingOrder.UNORDERED;
        String str = setupTopic(getClass().getSimpleName() + "-input-" + RandomUtils.nextInt());
        ArrayList arrayList = new ArrayList();
        log.info("Producing {} messages before starting test", Integer.valueOf(expectedMessageCount));
        ArrayList arrayList2 = new ArrayList();
        KafkaProducer createNewProducer = this.kcu.createNewProducer(false);
        for (int i = 0; i < expectedMessageCount; i++) {
            try {
                String str2 = "key-" + i;
                arrayList2.add(createNewProducer.send(new ProducerRecord(str, str2, "value-" + i), (recordMetadata, exc) -> {
                    if (exc != null) {
                        log.error("Error sending, ", exc);
                    }
                }));
                arrayList.add(str2);
            } finally {
            }
        }
        log.debug("Finished sending test data");
        if (createNewProducer != null) {
            createNewProducer.close();
        }
        log.debug("Waiting for broker acks");
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Assertions.assertThat(arrayList2).hasSize(expectedMessageCount);
        log.debug("Starting test");
        KafkaProducer createNewProducer2 = this.kcu.createNewProducer(commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER));
        KafkaConsumer createNewConsumer = this.kcu.createNewConsumer(true, new Properties());
        VertxParallelEoSStreamProcessor vertxParallelEoSStreamProcessor = new VertxParallelEoSStreamProcessor(ParallelConsumerOptions.builder().ordering(processingOrder).consumer(createNewConsumer).producer(createNewProducer2).commitMode(commitMode).maxConcurrency(expectedConcurrentCount).build());
        vertxParallelEoSStreamProcessor.subscribe(UniLists.of(str));
        TopicPartition topicPartition = new TopicPartition(str, 0);
        Map beginningOffsets = createNewConsumer.beginningOffsets(UniLists.of(topicPartition));
        Assertions.assertThat((Long) createNewConsumer.endOffsets(UniLists.of(topicPartition)).get(topicPartition)).isEqualTo(2000L);
        Assertions.assertThat((Long) beginningOffsets.get(topicPartition)).isEqualTo(0L);
        vertxParallelEoSStreamProcessor.vertxHttpReqInfo(consumerRecord -> {
            this.consumedKeys.add((String) consumerRecord.key());
            return new VertxParallelEoSStreamProcessor.RequestInfo("localhost", stubServer.port(), "/", UniMaps.of());
        }, future -> {
            this.processedCount.incrementAndGet();
        }, asyncResult -> {
            this.httpResponseReceivedCount.incrementAndGet();
            log.trace("Response received complete {}", asyncResult);
        });
        log.info("Waiting for {}/2 requests in parallel on server.", Integer.valueOf(expectedConcurrentCount));
        Assertions.useRepresentation(new TrimListRepresentation());
        String msg = StringUtils.msg("Mock server receives {} requests in parallel from vertx engine", new Object[]{1000});
        try {
            Awaitility.waitAtMost(Duration.ofSeconds(20L)).pollInterval(Duration.ofMillis(200L)).alias(msg).untilAsserted(() -> {
                log.info("got {}/{}", Integer.valueOf(requestsReceivedOnServer.size()), 50);
                Assertions.assertThat(requestsReceivedOnServer.size()).isGreaterThanOrEqualTo(50);
            });
        } catch (ConditionTimeoutException e) {
            Assertions.fail(msg + "\n" + e.getMessage());
        }
        log.info("{} requests received in parallel by server, releasing server response lock.", Integer.valueOf(requestsReceivedOnServer.size()));
        LatchTestUtils.release(responseLock);
        log.info("Waiting for {} responses from server, while checking concurrent requests never exceed max concurrency.", Integer.valueOf(expectedMessageCount));
        Awaitility.waitAtMost(Duration.ofSeconds(120L)).alias(msg).failFast(StringUtils.msg("max concurrency exceeded {}", new Object[]{Integer.valueOf(expectedConcurrentCount)}), () -> {
            int i2 = numberRequestsProcessing.get();
            if (i2 <= expectedConcurrentCount) {
                return false;
            }
            log.error("Concurrency too high {}", Integer.valueOf(i2));
            return true;
        }).untilAsserted(() -> {
            flog.atInfo().atMostEvery(1, TimeUnit.SECONDS).log("Concurrency level: %s", numberRequestsProcessing.get());
            Assertions.assertThat(this.httpResponseReceivedCount).hasValue(expectedMessageCount);
        });
        log.info("All {} responses received.", Integer.valueOf(expectedMessageCount));
        bar.close();
        vertxParallelEoSStreamProcessor.closeDrainFirst();
        int i2 = highestConcurrency.get();
        log.info("Highest concurrency was {}", Integer.valueOf(i2));
        Truth.assertWithMessage("Should at some point reach max concurrency").that(Integer.valueOf(i2)).isAtLeast(Integer.valueOf(expectedConcurrentCount));
        Assertions.assertThat(expectedMessageCount).isEqualTo(this.processedCount.get());
        Assertions.assertThat(responseLock.getCount()).isZero();
    }

    private void assertNumberOfThreads() {
        Set<Thread> keySet = Thread.getAllStackTraces().keySet();
        String str = "pc-";
        String str2 = "qtp";
        long count = keySet.stream().filter(thread -> {
            return thread.getName().startsWith(str);
        }).count();
        long count2 = keySet.stream().filter(thread2 -> {
            return thread2.getName().startsWith(str2);
        }).count();
        if (count > 0) {
            log.info("Checking there are only {} PC threads running", 3);
            Assertions.assertThat(count).as("Number of Parallel Consumer threads outside expected estimates", new Object[0]).isEqualTo(3);
        }
        log.info("Checking there are about ~{} wire mock threads running to process requests in parallel from vert.x", 50);
        Assertions.assertThat(count2).as("Number of wiremock threads outside expected estimates", new Object[0]).isCloseTo(50L, Percentage.withPercentage(60.0d));
    }
}
