package net.pincette.rs.kafka;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.Flow;
import net.pincette.rs.Mapper;
import net.pincette.rs.streams.Message;
import net.pincette.rs.streams.TopicSource;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:net/pincette/rs/kafka/KafkaTopicSource.class */
public class KafkaTopicSource<K, V> implements TopicSource<K, V, ConsumerRecord<K, V>> {
    private final KafkaPublisher<K, V> publisher;

    public KafkaTopicSource(KafkaPublisher<K, V> kafkaPublisher) {
        this.publisher = kafkaPublisher;
    }

    public Flow.Processor<ConsumerRecord<K, V>, Message<K, V>> connect(String str) {
        return Mapper.map(consumerRecord -> {
            return Message.message(consumerRecord.key(), consumerRecord.value()).withTimestamp(Instant.ofEpochMilli(consumerRecord.timestamp()));
        });
    }

    public Map<String, Flow.Publisher<ConsumerRecord<K, V>>> publishers() {
        return this.publisher.publishers();
    }

    public void start() {
        this.publisher.start();
    }

    public void stop() {
        this.publisher.stop();
    }
}
