package io.confluent.parallelconsumer.integrationTests.utils;

import io.confluent.parallelconsumer.ManagedTruth;
import java.time.Duration;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/utils/BrokerCommitAsserter.class */
public class BrokerCommitAsserter {
    private static final Logger log = LoggerFactory.getLogger(BrokerCommitAsserter.class);

    @NonNull
    private final String defaultTopic;

    @NonNull
    private final KafkaConsumer<?, ?> assertConsumer;

    public void assertConsumedAtLeastOffset(int i) {
        assertConsumedAtLeastOffset(getDefaultTopic(), i);
    }

    public void assertConsumedAtLeastOffset(String str, int i) {
        setup(str, i);
        Awaitility.await().untilAsserted(() -> {
            ConsumerRecords poll = this.assertConsumer.poll(Duration.ofSeconds(1L));
            log.debug("Polled {} records, looking for at least offset {}", Integer.valueOf(poll.count()), Integer.valueOf(i));
            ManagedTruth.assertThat(poll).hasHeadOffsetAtLeastInAnyTopicPartition(i);
        });
        post();
    }

    private void post() {
        this.assertConsumer.unsubscribe();
    }

    private void setup(String str, int i) {
        log.debug("Asserting against topic: {}, expecting to consume at LEAST offset {}", str, Integer.valueOf(i));
        this.assertConsumer.subscribe(UniSets.of(str));
        this.assertConsumer.seekToBeginning(UniSets.of());
    }

    public void assertConsumedAtMostOffset(String str, int i) {
        setup(str, i);
        Duration ofSeconds = Duration.ofSeconds(5L);
        log.debug("Delaying by {} to check consumption from topic {} by at most {}", new Object[]{ofSeconds, str, Integer.valueOf(i)});
        Awaitility.await().pollDelay(ofSeconds).timeout(ofSeconds.plusSeconds(1L)).untilAsserted(() -> {
            ConsumerRecords poll = this.assertConsumer.poll(Duration.ofSeconds(1L));
            log.debug("Polled {} records, looking for at MOST offset {}", Integer.valueOf(poll.count()), Integer.valueOf(i));
            ManagedTruth.assertThat(poll).hasHeadOffsetAtMostInAnyTopicPartition(i);
        });
        post();
    }

    public BrokerCommitAsserter(@NonNull String str, @NonNull KafkaConsumer<?, ?> kafkaConsumer) {
        if (str == null) {
            throw new NullPointerException("defaultTopic is marked non-null but is null");
        }
        if (kafkaConsumer == null) {
            throw new NullPointerException("assertConsumer is marked non-null but is null");
        }
        this.defaultTopic = str;
        this.assertConsumer = kafkaConsumer;
    }

    @NonNull
    private String getDefaultTopic() {
        return this.defaultTopic;
    }
}
