package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.SenderResult;
import java.io.Closeable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

/* loaded from: input_file:io/atleon/kafka/AloKafkaSender.class */
public class AloKafkaSender<K, V> implements Closeable {
    public static final String CONFIG_PREFIX = "kafka.sender.";
    public static final String AUTO_INCREMENT_CLIENT_ID_CONFIG = "kafka.sender.auto.increment.client.id";
    public static final String MAX_IN_FLIGHT_PER_SEND_CONFIG = "kafka.sender.max.in.flight.per.send";
    public static final String STOP_ON_ERROR_CONFIG = "kafka.sender.stop.on.error";
    private static final boolean DEFAULT_AUTO_INCREMENT_CLIENT_ID = false;
    private static final boolean DEFAULT_STOP_ON_ERROR = false;
    private final Mono<SendResources<K, V>> futureResources;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();
    private static final Logger LOGGER = LoggerFactory.getLogger(AloKafkaSender.class);
    private static final Map<String, AtomicLong> COUNTS_BY_ID = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/kafka/AloKafkaSender$SendResources.class */
    public static final class SendResources<K, V> {
        private final KafkaSender<K, V> sender;

        private SendResources(KafkaSender<K, V> kafkaSender) {
            this.sender = kafkaSender;
        }

        public static <K, V> SendResources<K, V> fromConfig(KafkaConfig kafkaConfig) {
            SenderOptions create = SenderOptions.create(newProducerConfig(kafkaConfig));
            return new SendResources<>(KafkaSender.create(ContextualProducerFactory.INSTANCE, create.maxInFlight(kafkaConfig.loadInt(AloKafkaSender.MAX_IN_FLIGHT_PER_SEND_CONFIG).orElse(Integer.valueOf(create.maxInFlight())).intValue()).stopOnError(kafkaConfig.loadBoolean(AloKafkaSender.STOP_ON_ERROR_CONFIG).orElse(false).booleanValue())));
        }

        public <T> Flux<KafkaSenderResult<T>> send(Publisher<T> publisher, Function<T, ProducerRecord<K, V>> function) {
            Flux map = Flux.from(publisher).map(obj -> {
                return SenderRecord.create((ProducerRecord) function.apply(obj), obj);
            });
            KafkaSender<K, V> kafkaSender = this.sender;
            Objects.requireNonNull(kafkaSender);
            return map.transform((v1) -> {
                return r1.send(v1);
            }).map(KafkaSenderResult::fromSenderResult);
        }

        public <T> Flux<Alo<KafkaSenderResult<T>>> sendAlos(Publisher<Alo<T>> publisher, Function<T, ProducerRecord<K, V>> function) {
            Flux map = AloFlux.toFlux(publisher).map(alo -> {
                return SenderRecord.create((ProducerRecord) function.apply(alo.get()), alo);
            });
            KafkaSender<K, V> kafkaSender = this.sender;
            Objects.requireNonNull(kafkaSender);
            return map.transform((v1) -> {
                return r1.send(v1);
            }).map(KafkaSenderResult::fromSenderResultOfAlo);
        }

        public void close() {
            this.sender.close();
        }

        private static Map<String, Object> newProducerConfig(KafkaConfig kafkaConfig) {
            return kafkaConfig.modifyAndGetProperties(map -> {
                map.keySet().removeIf(str -> {
                    return str.startsWith(AloKafkaSender.CONFIG_PREFIX);
                });
                if (kafkaConfig.loadBoolean(AloKafkaSender.AUTO_INCREMENT_CLIENT_ID_CONFIG).orElse(false).booleanValue()) {
                    map.computeIfPresent("client.id", (str2, obj) -> {
                        return incrementId(obj.toString());
                    });
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static String incrementId(String str) {
            return str + "-" + ((AtomicLong) AloKafkaSender.COUNTS_BY_ID.computeIfAbsent(str, str2 -> {
                return new AtomicLong();
            })).incrementAndGet();
        }
    }

    private AloKafkaSender(KafkaConfigSource kafkaConfigSource) {
        this.futureResources = ((Mono) kafkaConfigSource.create()).map(SendResources::fromConfig).cacheInvalidateWhen(sendResources -> {
            return this.closeSink.asFlux().next().then();
        }, (v0) -> {
            v0.close();
        });
    }

    public static <K, V> AloKafkaSender<K, V> from(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaSender<>(kafkaConfigSource);
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(String str, Function<? super V, ? extends K> function) {
        return publisher -> {
            return sendValues(publisher, str, function);
        };
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> publisher, String str, Function<? super V, ? extends K> function) {
        return sendValues(publisher, obj -> {
            return str;
        }, function);
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return publisher -> {
            return sendValues(publisher, function, function2);
        };
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> publisher, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        Function<V, ProducerRecord<K, V>> newValueBasedRecordCreator = newValueBasedRecordCreator(function, function2);
        return this.futureResources.flatMapMany(sendResources -> {
            return sendResources.send(publisher, newValueBasedRecordCreator);
        });
    }

    public Mono<KafkaSenderResult<ProducerRecord<K, V>>> sendRecord(ProducerRecord<K, V> producerRecord) {
        return sendRecords(Flux.just(producerRecord)).next();
    }

    public Flux<KafkaSenderResult<ProducerRecord<K, V>>> sendRecords(Publisher<ProducerRecord<K, V>> publisher) {
        return this.futureResources.flatMapMany(sendResources -> {
            return sendResources.send(publisher, Function.identity());
        });
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(String str, Function<? super V, ? extends K> function) {
        return publisher -> {
            return sendAloValues(publisher, str, function);
        };
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> publisher, String str, Function<? super V, ? extends K> function) {
        return sendAloValues(publisher, obj -> {
            return str;
        }, function);
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return publisher -> {
            return sendAloValues(publisher, function, function2);
        };
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> publisher, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        Function<V, ProducerRecord<K, V>> newValueBasedRecordCreator = newValueBasedRecordCreator(function, function2);
        return ((AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendResources.sendAlos(publisher, newValueBasedRecordCreator);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        })).processFailure((v0) -> {
            return v0.isFailure();
        }, (v0) -> {
            return SenderResult.toError(v0);
        });
    }

    public AloFlux<KafkaSenderResult<ProducerRecord<K, V>>> sendAloRecords(Publisher<Alo<ProducerRecord<K, V>>> publisher) {
        return ((AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendResources.sendAlos(publisher, Function.identity());
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        })).processFailure((v0) -> {
            return v0.isFailure();
        }, (v0) -> {
            return SenderResult.toError(v0);
        });
    }

    public void close(Object obj) {
        LOGGER.info("Closing AloKafkaSender due to reason={}", obj);
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closeSink.tryEmitNext(Long.valueOf(System.currentTimeMillis()));
    }

    private Function<V, ProducerRecord<K, V>> newValueBasedRecordCreator(Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return obj -> {
            return new ProducerRecord((String) function.apply(obj), (Integer) null, function2.apply(obj), obj);
        };
    }
}
