package io.confluent.parallelconsumer.vertx;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.vertx.JStreamVertxParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

@ExtendWith({VertxExtension.class})
/* loaded from: input_file:io/confluent/parallelconsumer/vertx/VertxTest.class */
public class VertxTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(VertxTest.class);
    JStreamVertxParallelEoSStreamProcessor<String, String> vertxAsync;
    protected static WireMockServer stubServer;
    protected static final String stubResponse = "Good times.";

    VertxParallelEoSStreamProcessor.RequestInfo getGoodHost() {
        return new VertxParallelEoSStreamProcessor.RequestInfo("localhost", stubServer.port(), "/", UniMaps.of());
    }

    VertxParallelEoSStreamProcessor.RequestInfo getBadHost() {
        return new VertxParallelEoSStreamProcessor.RequestInfo("localhost", 1, "", UniMaps.of());
    }

    @BeforeAll
    public static void setupWireMock() {
        stubServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort());
        stubServer.start();
        stubServer.stubFor(WireMock.get(WireMock.urlPathEqualTo("/")).willReturn(WireMock.aResponse().withBody(stubResponse)));
    }

    protected ParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) {
        Vertx vertx = Vertx.vertx(new VertxOptions());
        this.vertxAsync = new JStreamVertxParallelEoSStreamProcessor<>(this.consumerSpy, this.producerSpy, vertx, WebClient.create(vertx), ParallelConsumerOptions.builder().build());
        return this.vertxAsync;
    }

    @BeforeEach
    public void setupData() {
        super.primeFirstRecord();
    }

    @Test
    public void sanityTest(Vertx vertx, VertxTestContext vertxTestContext) {
        WebClient.create(vertx).get(getGoodHost().getPort(), getGoodHost().getHost(), "").send(vertxTestContext.succeeding(httpResponse -> {
            vertxTestContext.verify(() -> {
                log.debug("callback {}", httpResponse.bodyAsString());
                vertxTestContext.completeNow();
            });
        }));
    }

    @Test
    public void failingHttpCall() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> vertxHttpReqInfoStream = this.vertxAsync.vertxHttpReqInfoStream(consumerRecord -> {
            return getBadHost();
        });
        awaitLatch(countDownLatch);
        assertCommits(UniLists.of(0));
        List<AsyncResult<HttpResponse<Buffer>>> results = getResults(vertxHttpReqInfoStream);
        Assertions.assertThat(results).doesNotContainNull();
        Assertions.assertThat(results).extracting((v0) -> {
            return v0.failed();
        }).containsOnly(new Boolean[]{true});
        Assertions.assertThat(results).flatExtracting(asyncResult -> {
            return Arrays.asList(asyncResult.cause().getMessage().split(" "));
        }).contains(new String[]{"Connection", "refused:"});
    }

    @Test
    public void testVertxFunctionFail(Vertx vertx, VertxTestContext vertxTestContext) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        Stream vertxHttpReqInfoStream = this.vertxAsync.vertxHttpReqInfoStream(consumerRecord -> {
            log.debug("Inner user function");
            return getBadHost();
        });
        awaitLatch(countDownLatch);
        List list = (List) vertxHttpReqInfoStream.map((v0) -> {
            return v0.getAsr();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(1);
        Future onComplete = ((Future) list.get(0)).onComplete(asyncResult -> {
        });
        ConditionFactory await = Awaitility.await();
        Objects.requireNonNull(onComplete);
        await.until(onComplete::isComplete);
        Assertions.assertThat(onComplete).isNotNull();
        onComplete.onComplete(vertxTestContext.failing(th -> {
            vertxTestContext.verify(() -> {
                Assertions.assertThat(th).hasMessageContainingAll(new CharSequence[]{"Connection", "refused"});
                vertxTestContext.completeNow();
            });
        }));
        Assertions.assertThat(this.vertxAsync.workRemaining()).isEqualTo(1);
    }

    @Test
    public void testHttpMinimal() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> vertxHttpReqInfoStream = this.vertxAsync.vertxHttpReqInfoStream(consumerRecord -> {
            log.debug("Inner user function");
            VertxParallelEoSStreamProcessor.RequestInfo goodHost = getGoodHost();
            goodHost.setParams(UniMaps.of("randomParam", (String) consumerRecord.value()));
            return goodHost;
        });
        awaitLatch(countDownLatch);
        waitForOneLoopCycle();
        List<AsyncResult<HttpResponse<Buffer>>> results = getResults(vertxHttpReqInfoStream);
        KafkaTestUtils.assertCommits(this.producerSpy, UniLists.of(0, 1));
        Assertions.assertThat(results).extracting(asyncResult -> {
            return Integer.valueOf(((HttpResponse) asyncResult.result()).statusCode());
        }).containsOnly(new Integer[]{200});
        Assertions.assertThat(results).extracting(asyncResult2 -> {
            return ((HttpResponse) asyncResult2.result()).bodyAsString();
        }).contains(new String[]{stubResponse});
    }

    @Test
    public void testHttp() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> vertxHttpRequestStream = this.vertxAsync.vertxHttpRequestStream((webClient, consumerRecord) -> {
            log.debug("Inner user function");
            String str = (String) consumerRecord.value();
            VertxParallelEoSStreamProcessor.RequestInfo goodHost = getGoodHost();
            return webClient.get(goodHost.getPort(), goodHost.getHost(), goodHost.getContextPath()).addQueryParam("randomParam", str);
        });
        awaitLatch(countDownLatch);
        List<AsyncResult<HttpResponse<Buffer>>> results = getResults(vertxHttpRequestStream);
        Assertions.assertThat(results).hasSize(1).doesNotContainNull();
        Assertions.assertThat(results).extracting((v0) -> {
            return v0.cause();
        }).containsOnlyNulls();
        Assertions.assertThat(results).extracting(asyncResult -> {
            return Integer.valueOf(((HttpResponse) asyncResult.result()).statusCode());
        }).containsOnly(new Integer[]{200});
        Assertions.assertThat(results).extracting(asyncResult2 -> {
            return ((HttpResponse) asyncResult2.result()).bodyAsString();
        }).contains(new String[]{stubResponse});
    }

    private List<AsyncResult<HttpResponse<Buffer>>> getResults(Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> stream) {
        return blockingGetResults((List) stream.map((v0) -> {
            return v0.getAsr();
        }).collect(Collectors.toList()));
    }

    @Disabled
    @Test
    public void handleHttpResponseCodes() {
        Assertions.assertThat(true).isFalse();
    }

    private <T> List<AsyncResult<T>> blockingGetResults(List<Future<T>> list) {
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        Iterator<Future<T>> it = list.iterator();
        while (it.hasNext()) {
            it.next().onComplete(asyncResult -> {
                arrayList.add(asyncResult);
                countDownLatch.countDown();
            });
        }
        if (countDownLatch.await(defaultTimeoutSeconds, TimeUnit.SECONDS)) {
            return arrayList;
        }
        throw new AssertionError("Timeout reached");
    }
}
