package io.confluent.parallelconsumer.examples.vertx;

import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.vertx.VertxTest;
import java.time.Duration;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/examples/vertx/VertxAppTest.class */
public class VertxAppTest {
    private static final Logger log = LoggerFactory.getLogger(VertxAppTest.class);
    TopicPartition tp = new TopicPartition(VertxApp.inputTopic, 0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/parallelconsumer/examples/vertx/VertxAppTest$VertxAppAppUnderTest.class */
    public class VertxAppAppUnderTest extends VertxApp {
        private final int port;
        LongPollingMockConsumer<String, String> mockConsumer = (LongPollingMockConsumer) Mockito.spy(new LongPollingMockConsumer(OffsetResetStrategy.EARLIEST));

        Consumer<String, String> getKafkaConsumer() {
            HashMap hashMap = new HashMap();
            hashMap.put(VertxAppTest.this.tp, 0L);
            this.mockConsumer.updateBeginningOffsets(hashMap);
            Mockito.when(this.mockConsumer.groupMetadata()).thenReturn(ParallelEoSStreamProcessorTestBase.DEFAULT_GROUP_METADATA);
            return this.mockConsumer;
        }

        Producer<String, String> getKafkaProducer() {
            return new MockProducer(true, (Serializer) null, (Serializer) null);
        }

        public void postSetup() {
            this.mockConsumer.subscribeWithRebalanceAndAssignment(UniLists.of(inputTopic), 1);
        }

        protected int getPort() {
            return this.port;
        }

        public VertxAppAppUnderTest(int i) {
            this.port = i;
        }
    }

    @Timeout(20)
    @Test
    public void test() {
        log.info("Test start");
        VertxTest.setupWireMock();
        VertxAppAppUnderTest vertxAppAppUnderTest = new VertxAppAppUnderTest(VertxTest.stubServer.port());
        vertxAppAppUnderTest.run();
        vertxAppAppUnderTest.mockConsumer.addRecord(new ConsumerRecord(VertxApp.inputTopic, 0, 0L, "a key 1", "a value"));
        vertxAppAppUnderTest.mockConsumer.addRecord(new ConsumerRecord(VertxApp.inputTopic, 0, 1L, "a key 2", "a value"));
        vertxAppAppUnderTest.mockConsumer.addRecord(new ConsumerRecord(VertxApp.inputTopic, 0, 2L, "a key 3", "a value"));
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
            KafkaTestUtils.assertLastCommitIs(vertxAppAppUnderTest.mockConsumer, 3);
        });
        vertxAppAppUnderTest.close();
    }
}
