package io.confluent.parallelconsumer.reactor;

import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.BatchTestBase;
import io.confluent.parallelconsumer.BatchTestMethods;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.RateLimiter;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/confluent/parallelconsumer/reactor/ReactorBatchTest.class */
public class ReactorBatchTest extends ReactorUnitTestBase implements BatchTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReactorBatchTest.class);
    BatchTestMethods<Mono<String>> batchTestMethods;

    @BeforeEach
    void setup() {
        this.batchTestMethods = new BatchTestMethods<Mono<String>>(this) { // from class: io.confluent.parallelconsumer.reactor.ReactorBatchTest.1
            protected KafkaTestUtils getKtu() {
                return ReactorBatchTest.this.ktu;
            }

            protected Mono<String> averageBatchSizeTestPollStep(PollContext<String, String> pollContext) {
                return Mono.just(StringUtils.msg("Saw batch or records: {}", new Object[]{pollContext.getOffsetsFlattened()})).delayElement(Duration.ofMillis(30L));
            }

            protected void averageBatchSizeTestPoll(AtomicInteger atomicInteger, AtomicInteger atomicInteger2, RateLimiter rateLimiter) {
                ReactorBatchTest.this.reactorPC.react(pollContext -> {
                    return (Publisher) averageBatchSizeTestPollInner(atomicInteger, atomicInteger2, rateLimiter, pollContext);
                });
            }

            protected AbstractParallelEoSStreamProcessor getPC() {
                return ReactorBatchTest.this.reactorPC;
            }

            public void simpleBatchTestPoll(List<PollContext<String, String>> list) {
                ReactorBatchTest.this.reactorPC.react(pollContext -> {
                    String msg = StringUtils.msg("Saw batch or records: {}", new Object[]{pollContext.getOffsetsFlattened()});
                    ReactorBatchTest.log.debug(msg);
                    list.add(pollContext);
                    return Mono.just(msg);
                });
            }

            protected void batchFailPoll(List<PollContext<String, String>> list) {
                ReactorBatchTest.this.reactorPC.react(pollContext -> {
                    batchFailPollInner(pollContext);
                    list.add(pollContext);
                    return Mono.just(StringUtils.msg("Saw batch or records: {}", new Object[]{pollContext.getOffsetsFlattened()}));
                });
            }

            /* renamed from: averageBatchSizeTestPollStep, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ Object m1averageBatchSizeTestPollStep(PollContext pollContext) {
                return averageBatchSizeTestPollStep((PollContext<String, String>) pollContext);
            }
        };
    }

    @Test
    public void averageBatchSizeTest() {
        this.batchTestMethods.averageBatchSizeTest(10000);
    }

    @EnumSource
    @ParameterizedTest
    public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        this.batchTestMethods.simpleBatchTest(processingOrder);
    }

    @EnumSource
    @ParameterizedTest
    public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        this.batchTestMethods.batchFailureTest(processingOrder);
    }
}
