/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.assertions;

import io.debezium.testing.system.assertions.KafkaAssertions;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;

public class PlainKafkaAssertions
implements KafkaAssertions<String, String> {
    private final Properties kafkaConsumerProps;

    public PlainKafkaAssertions(Properties kafkaConsumerProps) {
        this.kafkaConsumerProps = kafkaConsumerProps;
        kafkaConsumerProps.put("key.deserializer", StringDeserializer.class);
        kafkaConsumerProps.put("value.deserializer", StringDeserializer.class);
    }

    @Override
    public Consumer<String, String> getConsumer() {
        return new KafkaConsumer(this.kafkaConsumerProps);
    }

    @Override
    public void assertRecordsContain(String topic, String content) {
        try (Consumer<String, String> consumer = this.getConsumer();){
            consumer.subscribe(Collections.singleton(topic));
            consumer.seekToBeginning((Collection)consumer.assignment());
            ConsumerRecords records = consumer.poll(Duration.of(10L, ChronoUnit.SECONDS));
            long matchingCount = StreamSupport.stream(records.records(topic).spliterator(), false).filter(r -> ((String)r.value()).contains(content)).count();
            ((AbstractLongAssert)Assertions.assertThat((long)matchingCount).withFailMessage("Topic '%s' doesn't have message containing <%s>.", new Object[]{topic, content})).isGreaterThan(0L);
        }
    }
}

