package net.pincette.rs.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.pincette.function.SideEffect;
import net.pincette.util.Collections;
import net.pincette.util.Pair;
import net.pincette.util.State;
import net.pincette.util.StreamUtil;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:net/pincette/rs/kafka/KafkaPublisher.class */
public class KafkaPublisher<K, V> {
    private static final Duration POLL_TIMEOUT = Duration.ofMillis(100);
    private static final Duration RETRY = Duration.ofSeconds(5);
    private static final int WAIT_TIMEOUT = 3000;
    private final Set<String> cancelled;
    private final Supplier<KafkaConsumer<K, V>> consumerSupplier;
    private final BiConsumer<ConsumerEvent, KafkaConsumer<K, V>> eventHandler;
    private final Map<TopicPartition, OffsetAndMetadata> pendingCommits;
    private final Map<String, TopicPublisher<K, V>> publishers;
    private final Deque<ConsumerRecord<K, V>> recordsToCommit;
    private final Set<String> topics;
    private KafkaConsumer<K, V> consumer;
    private boolean started;
    private boolean stop;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/pincette/rs/kafka/KafkaPublisher$RebalanceListener.class */
    public class RebalanceListener implements ConsumerRebalanceListener {
        private RebalanceListener() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            if (KafkaPublisher.this.started) {
                return;
            }
            KafkaPublisher.this.started = true;
            KafkaPublisher.this.sendEvent(ConsumerEvent.STARTED);
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        }
    }

    public KafkaPublisher() {
        this(null, null, null);
    }

    private KafkaPublisher(Supplier<KafkaConsumer<K, V>> supplier, Set<String> set, BiConsumer<ConsumerEvent, KafkaConsumer<K, V>> biConsumer) {
        this.cancelled = new HashSet();
        this.pendingCommits = new ConcurrentHashMap();
        this.recordsToCommit = new ConcurrentLinkedDeque();
        this.consumerSupplier = supplier;
        this.topics = set;
        this.eventHandler = biConsumer;
        this.publishers = createPublishers();
    }

    public static <K, V> KafkaPublisher<K, V> publisher(Supplier<KafkaConsumer<K, V>> supplier) {
        return new KafkaPublisher<>(supplier, null, null);
    }

    private static Set<TopicPartition> partitions(String str, Collection<TopicPartition> collection) {
        return (Set) collection.stream().filter(topicPartition -> {
            return topicPartition.topic().equals(str);
        }).collect(Collectors.toSet());
    }

    private boolean allTopicsCancelled() {
        return this.cancelled.equals(this.topics);
    }

    private void cancelTopic(String str) {
        this.cancelled.add(str);
        if (allTopicsCancelled()) {
            this.stop = true;
        }
    }

    private void close(KafkaConsumer<K, V> kafkaConsumer) {
        sendEvent(ConsumerEvent.STOPPED);
        Util.LOGGER.finest(() -> {
            return "Closing consumer " + kafkaConsumer;
        });
        kafkaConsumer.close();
    }

    private TopicPublisher<K, V> createPublisher(String str) {
        Deque<ConsumerRecord<K, V>> deque = this.recordsToCommit;
        Objects.requireNonNull(deque);
        return new TopicPublisher<>(str, (v1) -> {
            r3.addLast(v1);
        }, this::cancelTopic);
    }

    private Map<String, TopicPublisher<K, V>> createPublishers() {
        return (Map) Optional.ofNullable(this.topics).stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toMap(str -> {
            return str;
        }, this::createPublisher));
    }

    private void commit() {
        getForever(() -> {
            return (Boolean) Optional.of(offsets(Collections.consumeHead(this.recordsToCommit))).filter(map -> {
                return !map.isEmpty();
            }).map(map2 -> {
                return (Boolean) SideEffect.run(() -> {
                    getConsumer().ifPresent(kafkaConsumer -> {
                        kafkaConsumer.commitSync((Map) Util.trace("Commit", map2));
                        removePendingCommits(map2);
                    });
                }).andThenGet(() -> {
                    return true;
                });
            }).orElse(false);
        });
    }

    private void dispatch(ConsumerRecords<K, V> consumerRecords) {
        this.publishers.forEach((str, topicPublisher) -> {
            dispatch(str, topicPublisher, (List) StreamUtil.stream(consumerRecords.records(str).iterator()).collect(Collectors.toList()));
        });
    }

    private void dispatch(String str, TopicPublisher<K, V> topicPublisher, List<ConsumerRecord<K, V>> list) {
        if (!list.isEmpty()) {
            this.pendingCommits.putAll(offsets(list.stream()));
            topicPublisher.next(list);
        }
        if (topicPublisher.more()) {
            resume(str);
        } else {
            pause(str);
        }
    }

    private Optional<KafkaConsumer<K, V>> getConsumer() {
        if (this.consumer == null && this.consumerSupplier != null && this.topics != null) {
            this.consumer = this.consumerSupplier.get();
            this.consumer.subscribe(this.topics, new RebalanceListener());
        }
        return Optional.ofNullable(this.consumer);
    }

    private <T> T getForever(Supplier<T> supplier) {
        return net.pincette.util.Util.tryToGetForever(() -> {
            return CompletableFuture.completedFuture(supplier.get());
        }, RETRY, this::panic).toCompletableFuture().join();
    }

    private Map<TopicPartition, OffsetAndMetadata> offsets(Stream<ConsumerRecord<K, V>> stream) {
        return (Map) stream.map(consumerRecord -> {
            return Pair.pair(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
        }).collect(Collectors.toMap(pair -> {
            return (TopicPartition) pair.first;
        }, pair2 -> {
            return (OffsetAndMetadata) pair2.second;
        }, (offsetAndMetadata, offsetAndMetadata2) -> {
            return offsetAndMetadata.offset() > offsetAndMetadata2.offset() ? offsetAndMetadata : offsetAndMetadata2;
        }));
    }

    private void panic(Exception exc) {
        Util.LOGGER.log(Level.SEVERE, exc.getMessage(), (Throwable) exc);
        if (this.consumer != null) {
            Util.LOGGER.finest(() -> {
                return "Closing consumer " + this.consumer;
            });
            this.consumer.close();
            this.consumer = null;
        }
    }

    private void pause(String str) {
        Optional.of(Collections.difference(partitions(str, this.consumer.assignment()), paused(str))).filter(set -> {
            return !set.isEmpty();
        }).ifPresent(set2 -> {
            Util.LOGGER.log(Level.FINE, () -> {
                return "Pause " + str;
            });
            this.consumer.pause(set2);
        });
    }

    private Set<TopicPartition> paused(String str) {
        return partitions(str, this.consumer.paused());
    }

    private ConsumerRecords<K, V> poll() {
        return (ConsumerRecords) getForever(() -> {
            return (ConsumerRecords) getConsumer().map(kafkaConsumer -> {
                return kafkaConsumer.poll(POLL_TIMEOUT);
            }).orElse(null);
        });
    }

    public Map<String, Flow.Publisher<ConsumerRecord<K, V>>> publishers() {
        return (Map) this.publishers.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private void removePendingCommits(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((topicPartition, offsetAndMetadata) -> {
            Optional.ofNullable(this.pendingCommits.get(topicPartition)).filter(offsetAndMetadata -> {
                return offsetAndMetadata.offset() <= offsetAndMetadata.offset();
            }).ifPresent(offsetAndMetadata2 -> {
                this.pendingCommits.remove(topicPartition);
            });
        });
    }

    private void resume(String str) {
        Optional.of(paused(str)).filter(set -> {
            return !set.isEmpty();
        }).ifPresent(set2 -> {
            Util.LOGGER.log(Level.FINE, () -> {
                return "Resume " + str;
            });
            this.consumer.resume(set2);
        });
    }

    private void sendEvent(ConsumerEvent consumerEvent) {
        if (this.eventHandler != null) {
            getConsumer().ifPresent(kafkaConsumer -> {
                this.eventHandler.accept(consumerEvent, kafkaConsumer);
            });
        }
    }

    public void start() {
        Util.LOGGER.finest("Starting");
        if (this.consumerSupplier == null) {
            throw new IllegalArgumentException("Can't run without a consumer.");
        }
        if (this.topics == null || this.topics.isEmpty()) {
            throw new IllegalArgumentException("Can't run without topics.");
        }
        while (!this.stop) {
            commit();
            dispatch(poll());
        }
        Util.LOGGER.finest("Stopped polling");
        getConsumer().ifPresent(kafkaConsumer -> {
            stopPublishers();
            waitForPendingCommits();
            close(kafkaConsumer);
        });
    }

    public void stop() {
        Util.LOGGER.finest("Stop requested");
        this.stop = true;
    }

    private void stopPublishers() {
        Util.LOGGER.finest("Stopping publishers");
        this.publishers.entrySet().stream().filter(entry -> {
            return !this.cancelled.contains(entry.getKey());
        }).forEach(entry2 -> {
            pause((String) entry2.getKey());
            ((TopicPublisher) entry2.getValue()).complete();
        });
    }

    private void waitForPendingCommits() {
        State state = new State();
        state.set(0);
        while (!allTopicsCancelled() && !this.pendingCommits.isEmpty() && ((Integer) state.get()).intValue() < WAIT_TIMEOUT) {
            net.pincette.util.Util.tryToDo(() -> {
                commit();
                Thread.sleep(100L);
                state.set(Integer.valueOf(((Integer) state.get()).intValue() + 100));
            });
        }
        if (((Integer) state.get()).intValue() >= WAIT_TIMEOUT) {
            Util.LOGGER.info("Timeout pending commits.");
        }
    }

    public KafkaPublisher<K, V> withConsumer(Supplier<KafkaConsumer<K, V>> supplier) {
        return new KafkaPublisher<>(supplier, this.topics, this.eventHandler);
    }

    public KafkaPublisher<K, V> withEventHandler(BiConsumer<ConsumerEvent, KafkaConsumer<K, V>> biConsumer) {
        return new KafkaPublisher<>(this.consumerSupplier, this.topics, biConsumer);
    }

    public KafkaPublisher<K, V> withTopics(Set<String> set) {
        return new KafkaPublisher<>(this.consumerSupplier, set, this.eventHandler);
    }
}
