package net.pincette.rs.kafka;

import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import net.pincette.rs.Commit;
import net.pincette.rs.FlattenList;
import net.pincette.rs.Mapper;
import net.pincette.rs.Pipe;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:net/pincette/rs/kafka/TopicPublisher.class */
public class TopicPublisher<K, V> implements Flow.Publisher<ConsumerRecord<K, V>> {
    private final Consumer<String> cancel;
    private final Consumer<ConsumerRecord<K, V>> commit;
    private final String topic;
    private boolean completed;
    private boolean more;
    private final Deque<List<ConsumerRecord<K, V>>> batches = new ConcurrentLinkedDeque();
    private final Flow.Processor<List<ConsumerRecord<K, V>>, ConsumerRecord<K, V>> preprocessor = Pipe.pipe(FlattenList.flattenList()).then(Mapper.map((v0) -> {
        return Util.trace(v0);
    })).then(Commit.commit(this::commitRecords));

    /* loaded from: input_file:net/pincette/rs/kafka/TopicPublisher$Backpressure.class */
    private class Backpressure implements Flow.Subscription {
        private Backpressure() {
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            TopicPublisher.this.cancel.accept(TopicPublisher.this.topic);
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            Optional ofNullable = Optional.ofNullable(TopicPublisher.this.batches.pollLast());
            TopicPublisher topicPublisher = TopicPublisher.this;
            ofNullable.ifPresentOrElse(list -> {
                topicPublisher.emit(list);
            }, () -> {
                TopicPublisher.this.more = true;
            });
            if (TopicPublisher.this.completed) {
                TopicPublisher.this.sendComplete();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TopicPublisher(String str, Consumer<ConsumerRecord<K, V>> consumer, Consumer<String> consumer2) {
        this.topic = str;
        this.commit = consumer;
        this.cancel = consumer2;
    }

    private CompletionStage<Boolean> commitRecords(List<ConsumerRecord<K, V>> list) {
        Util.LOGGER.finest(() -> {
            return "Publisher for topic " + this.topic + " receives " + list.size() + " to commit";
        });
        list.forEach(this.commit);
        return CompletableFuture.completedFuture(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void complete() {
        Util.LOGGER.finest(() -> {
            return "Completing publisher for topic " + this.topic;
        });
        this.completed = true;
        if (this.more) {
            sendComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void emit(List<ConsumerRecord<K, V>> list) {
        this.more = false;
        Util.LOGGER.finest(() -> {
            return "Emit batch of size " + list.size() + " from topic " + this.topic;
        });
        this.preprocessor.onNext(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean more() {
        return this.more;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void next(List<ConsumerRecord<K, V>> list) {
        if (list.isEmpty()) {
            return;
        }
        if (more()) {
            emit(list);
        } else {
            Util.LOGGER.finest(() -> {
                return "Buffer batch of size " + list.size() + " from topic " + this.topic;
            });
            this.batches.addFirst(list);
        }
    }

    private void sendComplete() {
        Util.LOGGER.finest(() -> {
            return "Publisher for topic " + this.topic + " sends onComplete";
        });
        this.preprocessor.onComplete();
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super ConsumerRecord<K, V>> subscriber) {
        this.preprocessor.subscribe(subscriber);
        this.preprocessor.onSubscribe(new Backpressure());
    }
}
