package io.confluent.parallelconsumer.vertx;

import com.github.tomakehurst.wiremock.WireMockServer;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.WireMockUtils;
import io.confluent.parallelconsumer.vertx.JStreamVertxParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Isolated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

@ExtendWith({VertxExtension.class})
@Isolated
/* loaded from: input_file:io/confluent/parallelconsumer/vertx/VertxTest.class */
class VertxTest extends VertxBaseUnitTest {
    private static final Logger log = LoggerFactory.getLogger(VertxTest.class);
    WireMockServer stubServer;

    VertxTest() {
    }

    VertxParallelEoSStreamProcessor.RequestInfo getGoodHost() {
        return new VertxParallelEoSStreamProcessor.RequestInfo("localhost", this.stubServer.port(), "/", UniMaps.of());
    }

    VertxParallelEoSStreamProcessor.RequestInfo getBadRequest() {
        return new VertxParallelEoSStreamProcessor.RequestInfo("xxxxxxxxx", 1, "/", UniMaps.of());
    }

    @BeforeEach
    void setupWireMock() {
        this.stubServer = new WireMockUtils().setupWireMock();
    }

    @AfterEach
    void closeWireMock() {
        this.stubServer.stop();
    }

    @Test
    void sanityTest(Vertx vertx, VertxTestContext vertxTestContext) {
        WebClient.create(vertx).get(getGoodHost().getPort(), getGoodHost().getHost(), "").send(vertxTestContext.succeeding(httpResponse -> {
            vertxTestContext.verify(() -> {
                log.debug("callback {}", httpResponse.bodyAsString());
                vertxTestContext.completeNow();
            });
        }));
    }

    @Test
    void failingHttpCall() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> vertxHttpReqInfoStream = this.vertxAsync.vertxHttpReqInfoStream(pollContext -> {
            return getBadRequest();
        });
        LatchTestUtils.awaitLatch(countDownLatch);
        assertCommits(UniLists.of());
        List<AsyncResult<HttpResponse<Buffer>>> results = getResults(vertxHttpReqInfoStream);
        Assertions.assertThat(results).doesNotContainNull();
        Assertions.assertThat(results).extracting((v0) -> {
            return v0.failed();
        }).containsOnly(new Boolean[]{true});
        Assertions.assertThat(results).flatExtracting(asyncResult -> {
            return Arrays.asList(asyncResult.cause().getMessage().toLowerCase().split(" "));
        }).contains(new String[]{"failed", "resolve"});
    }

    @Test
    void testVertxFunctionFail(Vertx vertx, VertxTestContext vertxTestContext) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        Stream vertxHttpReqInfoStream = this.vertxAsync.vertxHttpReqInfoStream(pollContext -> {
            log.debug("Inner user function");
            return getBadRequest();
        });
        LatchTestUtils.awaitLatch(countDownLatch);
        List list = (List) vertxHttpReqInfoStream.map((v0) -> {
            return v0.getAsr();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(1);
        Future onComplete = ((Future) list.get(0)).onComplete(asyncResult -> {
        });
        ConditionFactory await = Awaitility.await();
        Objects.requireNonNull(onComplete);
        await.until(onComplete::isComplete);
        Assertions.assertThat(onComplete).isNotNull();
        onComplete.onComplete(vertxTestContext.failing(th -> {
            vertxTestContext.verify(() -> {
                Assertions.assertThat(th).hasMessageContainingAll(new CharSequence[]{"Failed", "resolve"});
                vertxTestContext.completeNow();
            });
        }));
        Assertions.assertThat(this.vertxAsync.workRemaining()).isEqualTo(1L);
    }

    @Test
    void testHttpMinimal() {
        this.vertxAsync.setTimeBetweenCommits(Duration.ofSeconds(1L));
        Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> vertxHttpReqInfoStream = this.vertxAsync.vertxHttpReqInfoStream(pollContext -> {
            log.debug("Inner user function");
            VertxParallelEoSStreamProcessor.RequestInfo goodHost = getGoodHost();
            goodHost.setParams(UniMaps.of("randomParam", (String) pollContext.value()));
            return goodHost;
        });
        awaitForCommitExact(1);
        List<AsyncResult<HttpResponse<Buffer>>> results = getResults(vertxHttpReqInfoStream);
        Assertions.assertThat(results).extracting(asyncResult -> {
            return Integer.valueOf(((HttpResponse) asyncResult.result()).statusCode());
        }).containsOnly(new Integer[]{200});
        Assertions.assertThat(results).extracting(asyncResult2 -> {
            return ((HttpResponse) asyncResult2.result()).bodyAsString();
        }).contains(new String[]{"Good times."});
    }

    @Test
    void testHttp() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> vertxHttpRequestStream = this.vertxAsync.vertxHttpRequestStream((webClient, pollContext) -> {
            log.debug("Inner user function");
            String str = (String) pollContext.value();
            VertxParallelEoSStreamProcessor.RequestInfo goodHost = getGoodHost();
            return webClient.get(goodHost.getPort(), goodHost.getHost(), goodHost.getContextPath()).addQueryParam("randomParam", str);
        });
        LatchTestUtils.awaitLatch(countDownLatch);
        List<AsyncResult<HttpResponse<Buffer>>> results = getResults(vertxHttpRequestStream);
        Assertions.assertThat(results).hasSize(1).doesNotContainNull();
        Assertions.assertThat(results).extracting((v0) -> {
            return v0.cause();
        }).containsOnlyNulls();
        Assertions.assertThat(results).extracting(asyncResult -> {
            return Integer.valueOf(((HttpResponse) asyncResult.result()).statusCode());
        }).containsOnly(new Integer[]{200});
        Assertions.assertThat(results).extracting(asyncResult2 -> {
            return ((HttpResponse) asyncResult2.result()).bodyAsString();
        }).contains(new String[]{"Good times."});
    }

    private List<AsyncResult<HttpResponse<Buffer>>> getResults(Stream<JStreamVertxParallelEoSStreamProcessor.VertxCPResult<String, String>> stream) {
        return blockingGetResults((List) stream.map((v0) -> {
            return v0.getAsr();
        }).collect(Collectors.toList()));
    }

    @Disabled
    @Test
    void handleHttpResponseCodes() {
        Assertions.assertThat(true).isFalse();
    }

    private <T> List<AsyncResult<T>> blockingGetResults(List<Future<T>> list) {
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        Iterator<Future<T>> it = list.iterator();
        while (it.hasNext()) {
            it.next().onComplete(asyncResult -> {
                arrayList.add(asyncResult);
                countDownLatch.countDown();
            });
        }
        if (countDownLatch.await(defaultTimeoutSeconds, TimeUnit.SECONDS)) {
            return arrayList;
        }
        throw new AssertionError("Timeout reached");
    }

    @Test
    void genericVertxFuture(Vertx vertx, VertxTestContext vertxTestContext) {
        primeFirstRecord();
        primeFirstRecord();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        JStreamVertxParallelEoSStreamProcessor<String, String> jStreamVertxParallelEoSStreamProcessor = this.vertxAsync;
        Objects.requireNonNull(countDownLatch);
        jStreamVertxParallelEoSStreamProcessor.addVertxOnCompleteHook(countDownLatch::countDown);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Checkpoint checkpoint = vertxTestContext.checkpoint(3);
        this.vertxAsync.vertxFuture(pollContext -> {
            return vertx.executeBlocking(promise -> {
                log.debug("Inner user function {}", pollContext);
                try {
                    log.info("Waiting");
                    countDownLatch2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                checkpoint.flag();
                log.info("Finished waiting");
                promise.complete();
            });
        });
        log.info("Pausing");
        Thread.sleep(1000L);
        countDownLatch2.countDown();
        log.info("Counted down");
        LatchTestUtils.awaitLatch(countDownLatch);
        log.info("Latch gotten");
    }
}
