package net.pincette.rs.kafka;

import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import net.pincette.function.SideEffect;
import net.pincette.rs.Per;
import net.pincette.util.State;
import net.pincette.util.Util;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:net/pincette/rs/kafka/KafkaSubscriber.class */
public class KafkaSubscriber<K, V> implements Flow.Subscriber<ProducerRecord<K, V>> {
    private static final int BATCH = 500;
    private final BiConsumer<ProducerEvent, KafkaProducer<K, V>> eventHandler;
    private final List<KafkaSubscriber<K, V>.InternalSubscriber> internalSubscribers;
    private final Supplier<KafkaProducer<K, V>> producerSupplier;
    private Flow.Subscriber<ProducerRecord<K, V>> branch;
    private KafkaProducer<K, V> producer;
    private boolean sending;
    private Flow.Subscription subscription;
    private static final Duration BACKOFF = Duration.ofSeconds(5);
    private static final Duration TIMEOUT = Duration.ofMillis(500);
    private static final Duration WAIT_INTERVAL = Duration.ofMillis(500);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/pincette/rs/kafka/KafkaSubscriber$InternalSubscriber.class */
    public class InternalSubscriber implements Flow.Subscriber<List<ProducerRecord<K, V>>> {
        private final CompletableFuture<Void> future = new CompletableFuture<>();
        private boolean cancelled;
        private boolean completed;
        private Flow.Subscription subscription;

        private InternalSubscriber() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cancel() {
            this.cancelled = true;
            this.subscription.cancel();
            completeFuture();
        }

        private void completeFuture() {
            if (this.completed || this.cancelled) {
                this.future.complete(null);
            }
        }

        private void end() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            KafkaSubscriber.this.stop();
            completeFuture();
        }

        private void more() {
            if (this.completed || this.cancelled) {
                return;
            }
            this.subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            this.completed = true;
            end();
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            Util.LOGGER.log(Level.SEVERE, th.getMessage(), th);
            KafkaSubscriber.this.sendEvent(ProducerEvent.ERROR);
            end();
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(List<ProducerRecord<K, V>> list) {
            if (this.completed) {
                throw new Util.GeneralException("onNext on completed stream");
            }
            Util.LOGGER.finest(() -> {
                return "Sending batch of size " + list.size() + " to Kafka";
            });
            net.pincette.util.Util.tryToGetForever(() -> {
                return send(list);
            }, KafkaSubscriber.BACKOFF, this::panic).toCompletableFuture().join();
            more();
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            if (this.subscription != null) {
                this.subscription.cancel();
            }
            this.subscription = subscription;
            KafkaSubscriber.this.sendEvent(ProducerEvent.STARTED);
            more();
        }

        private void panic(Exception exc) {
            Util.LOGGER.log(Level.SEVERE, exc.getMessage(), (Throwable) exc);
            KafkaSubscriber.this.close(true);
        }

        private CompletionStage<Boolean> send(List<ProducerRecord<K, V>> list) {
            return (CompletionStage) KafkaSubscriber.this.getProducer().map(kafkaProducer -> {
                KafkaSubscriber.this.sending = true;
                for (int i = 0; i < list.size() - 1; i++) {
                    kafkaProducer.send((ProducerRecord) Util.trace("Send record", (ProducerRecord) list.get(i)));
                }
                return sendToKafka((ProducerRecord) Util.trace("Send record", (ProducerRecord) list.get(list.size() - 1))).thenApply(bool -> {
                    KafkaSubscriber.this.sending = false;
                    if (this.completed) {
                        end();
                    }
                    return bool;
                });
            }).orElseGet(() -> {
                return this.cancelled ? CompletableFuture.completedFuture(false) : CompletableFuture.failedFuture(new Util.GeneralException("No producer"));
            });
        }

        private CompletionStage<Boolean> sendToKafka(ProducerRecord<K, V> producerRecord) {
            CompletableFuture completableFuture = new CompletableFuture();
            KafkaSubscriber.this.getProducer().ifPresentOrElse(kafkaProducer -> {
                kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                    if (exc == null) {
                        completableFuture.complete(true);
                    } else {
                        panic(exc);
                        completableFuture.complete(false);
                    }
                });
            }, () -> {
                completableFuture.completeExceptionally(new Util.GeneralException("No producer"));
            });
            return completableFuture;
        }
    }

    public KafkaSubscriber() {
        this(null, null);
    }

    private KafkaSubscriber(Supplier<KafkaProducer<K, V>> supplier, BiConsumer<ProducerEvent, KafkaProducer<K, V>> biConsumer) {
        this.internalSubscribers = new ArrayList();
        this.producerSupplier = supplier;
        this.eventHandler = biConsumer;
    }

    public static <K, V> KafkaSubscriber<K, V> subscriber(Supplier<KafkaProducer<K, V>> supplier) {
        return new KafkaSubscriber<>(supplier, null);
    }

    private boolean allCancelled() {
        return this.internalSubscribers.stream().allMatch(internalSubscriber -> {
            return internalSubscriber.cancelled;
        });
    }

    public Flow.Subscriber<ProducerRecord<K, V>> branch() {
        Flow.Processor per = Per.per(BATCH, TIMEOUT, TIMEOUT);
        KafkaSubscriber<K, V>.InternalSubscriber internalSubscriber = new InternalSubscriber();
        this.internalSubscribers.add(internalSubscriber);
        per.subscribe(internalSubscriber);
        return per;
    }

    private void cancelAll() {
        this.internalSubscribers.stream().filter(internalSubscriber -> {
            return !internalSubscriber.cancelled;
        }).forEach(obj -> {
            ((InternalSubscriber) obj).cancel();
        });
    }

    private int cancelled() {
        return (int) this.internalSubscribers.stream().filter(internalSubscriber -> {
            return internalSubscriber.cancelled;
        }).count();
    }

    private void close(boolean z) {
        if (this.producer != null) {
            if (!this.sending || z) {
                KafkaProducer<K, V> kafkaProducer = this.producer;
                Util.LOGGER.finest(() -> {
                    return "Closing producer " + kafkaProducer;
                });
                this.producer = null;
                kafkaProducer.close();
            }
        }
    }

    private Optional<KafkaProducer<K, V>> getProducer() {
        if (this.producer == null && !allCancelled()) {
            this.producer = this.producerSupplier.get();
        }
        return Optional.ofNullable(this.producer);
    }

    public void join() {
        CompletableFuture.allOf((CompletableFuture[]) this.internalSubscribers.stream().map(internalSubscriber -> {
            return internalSubscriber.future;
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.branch.onComplete();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.branch.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(ProducerRecord<K, V> producerRecord) {
        this.branch.onNext(producerRecord);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
        this.subscription = subscription;
        this.branch = branch();
        this.branch.onSubscribe(subscription);
    }

    private void sendEvent(ProducerEvent producerEvent) {
        if (this.eventHandler != null) {
            getProducer().ifPresent(kafkaProducer -> {
                this.eventHandler.accept(producerEvent, kafkaProducer);
            });
        }
    }

    public void stop(Duration duration) {
        if (allCancelled()) {
            stop();
        } else {
            waitForCancel(duration).thenAccept(bool -> {
                if (Boolean.FALSE.equals(bool)) {
                    PrintStream printStream = System.out;
                    long count = this.internalSubscribers.stream().filter(internalSubscriber -> {
                        return !internalSubscriber.cancelled;
                    }).count();
                    this.internalSubscribers.size();
                    printStream.println(" Have to cancel " + count + " of " + printStream);
                }
                cancelAll();
                stop();
            });
        }
        join();
    }

    private void stop() {
        if (allCancelled()) {
            sendEvent(ProducerEvent.STOPPED);
            close(false);
        }
    }

    private CompletionStage<Boolean> waitForCancel(Duration duration) {
        State state = new State(Integer.valueOf(cancelled()));
        return net.pincette.util.Util.waitFor(net.pincette.util.Util.waitForCondition(() -> {
            return CompletableFuture.completedFuture(Boolean.valueOf(allCancelled()));
        }), () -> {
            return ((Boolean) Optional.of(Integer.valueOf(cancelled())).filter(num -> {
                return num.intValue() > ((Integer) state.get()).intValue();
            }).map(num2 -> {
                return (Boolean) SideEffect.run(() -> {
                    state.set(num2);
                }).andThenGet(() -> {
                    return true;
                });
            }).orElse(false)).booleanValue();
        }, WAIT_INTERVAL, duration).thenApply(optional -> {
            return (Boolean) optional.orElse(false);
        });
    }

    public KafkaSubscriber<K, V> withEventHandler(BiConsumer<ProducerEvent, KafkaProducer<K, V>> biConsumer) {
        return new KafkaSubscriber<>(this.producerSupplier, biConsumer);
    }

    public KafkaSubscriber<K, V> withProducer(Supplier<KafkaProducer<K, V>> supplier) {
        return new KafkaSubscriber<>(supplier, this.eventHandler);
    }
}
