package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.OrderManagingAcknowledgementOperator;
import io.atleon.util.ConfigLoading;
import io.atleon.util.Defaults;
import io.atleon.util.Instantiation;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

/* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver.class */
public class AloKafkaReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.receiver.";
    public static final String BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG = "kafka.receiver.block.request.on.partition.positions";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.receiver.poll.timeout";
    public static final String COMMIT_INTERVAL_CONFIG = "kafka.receiver.commit.interval";
    public static final String MAX_COMMIT_ATTEMPTS_CONFIG = "kafka.receiver.max.commit.attempts";
    public static final String ALO_FACTORY_CONFIG = "kafka.receiver.alo.factory";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "kafka.receiver.max.in.flight.per.subscription";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.receiver.close.timeout";
    private static final boolean DEFAULT_BLOCK_REQUEST_ON_PARTITION_POSITIONS = false;
    private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 100;
    private static final long DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION = 4096;
    private final KafkaConfigSource configSource;
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100);
    private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30);
    private static final Logger LOGGER = LoggerFactory.getLogger(AloKafkaReceiver.class);
    private static final Map<String, AtomicLong> COUNTS_BY_CLIENT_ID = new ConcurrentHashMap();
    private static final Map<String, Scheduler> SCHEDULERS_BY_CLIENT_ID = new ConcurrentHashMap();

    private AloKafkaReceiver(KafkaConfigSource kafkaConfigSource) {
        this.configSource = kafkaConfigSource;
    }

    public static <K, V> AloKafkaReceiver<K, V> from(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaReceiver<>(kafkaConfigSource);
    }

    public static <V> AloKafkaReceiver<Object, V> forValues(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaReceiver<>(kafkaConfigSource);
    }

    public AloFlux<V> receiveAloValues(Collection<String> collection) {
        return receiveAloRecords(collection).filter(consumerRecord -> {
            return consumerRecord.value() != null;
        }).map((v0) -> {
            return v0.value();
        });
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Collection<String> collection) {
        return (AloFlux) ((Mono) this.configSource.create()).flatMapMany(map -> {
            return receiveRecords(map, collection);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Flux<Alo<ConsumerRecord<K, V>>> receiveRecords(Map<String, Object> map, Collection<String> collection) {
        HashMap hashMap = new HashMap();
        hashMap.put(BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG, map.getOrDefault(BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG, false));
        hashMap.put(POLL_TIMEOUT_CONFIG, map.getOrDefault(POLL_TIMEOUT_CONFIG, DEFAULT_POLL_TIMEOUT));
        hashMap.put(COMMIT_INTERVAL_CONFIG, map.getOrDefault(COMMIT_INTERVAL_CONFIG, DEFAULT_COMMIT_INTERVAL));
        hashMap.put(MAX_COMMIT_ATTEMPTS_CONFIG, map.getOrDefault(MAX_COMMIT_ATTEMPTS_CONFIG, Integer.valueOf(DEFAULT_MAX_COMMIT_ATTEMPTS)));
        hashMap.put(MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG, map.getOrDefault(MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG, Long.valueOf(DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION)));
        hashMap.put(CLOSE_TIMEOUT_CONFIG, map.getOrDefault(CLOSE_TIMEOUT_CONFIG, DEFAULT_CLOSE_TIMEOUT));
        AloConsumerRecordFactory createAloFactory = createAloFactory(map);
        long longValue = ((Long) ConfigLoading.loadOrThrow(hashMap, MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG, Long::valueOf)).longValue();
        HashMap hashMap2 = new HashMap(map);
        Set keySet = hashMap.keySet();
        Objects.requireNonNull(hashMap2);
        keySet.forEach((v1) -> {
            r1.remove(v1);
        });
        String str = (String) ConfigLoading.loadOrThrow(map, "client.id", (v0) -> {
            return v0.toString();
        });
        hashMap2.put("client.id", str + "-" + nextClientIdCount(str));
        CompletableFuture completableFuture = ((Boolean) ConfigLoading.loadOrThrow(hashMap, BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG, Boolean::valueOf)).booleanValue() ? new CompletableFuture() : CompletableFuture.completedFuture(Collections.emptyList());
        CompletableFuture<Void> thenAccept = completableFuture.thenAccept(collection2 -> {
            collection2.forEach((v0) -> {
                v0.position();
            });
        });
        ReceiverOptions schedulerSupplier = ReceiverOptions.create(hashMap2).subscription(collection).pollTimeout((Duration) ConfigLoading.loadOrThrow(hashMap, POLL_TIMEOUT_CONFIG, (v0) -> {
            return Duration.parse(v0);
        })).commitInterval((Duration) ConfigLoading.loadOrThrow(hashMap, COMMIT_INTERVAL_CONFIG, (v0) -> {
            return Duration.parse(v0);
        })).maxCommitAttempts(((Integer) ConfigLoading.loadOrThrow(hashMap, MAX_COMMIT_ATTEMPTS_CONFIG, Integer::valueOf)).intValue()).closeTimeout((Duration) ConfigLoading.loadOrThrow(hashMap, CLOSE_TIMEOUT_CONFIG, (v0) -> {
            return Duration.parse(v0);
        })).schedulerSupplier(() -> {
            return schedulerForClient(str);
        });
        Objects.requireNonNull(completableFuture);
        return KafkaReceiver.create(schedulerSupplier.addAssignListener((v1) -> {
            r1.complete(v1);
        })).receive().transform(flux -> {
            return thenAccept.isDone() ? flux : flux.mergeWith(blockRequestOn(thenAccept));
        }).transform(flux2 -> {
            return createAloRecords(flux2, createAloFactory, longValue);
        });
    }

    private Flux<Alo<ConsumerRecord<K, V>>> createAloRecords(Flux<ReceiverRecord<K, V>> flux, AloConsumerRecordFactory<K, V> aloConsumerRecordFactory, long j) {
        Sinks.Empty empty = Sinks.empty();
        return flux.map(receiverRecord -> {
            ReceiverOffset receiverOffset = receiverRecord.receiverOffset();
            Objects.requireNonNull(receiverOffset);
            Runnable runnable = receiverOffset::acknowledge;
            Objects.requireNonNull(empty);
            return aloConsumerRecordFactory.create(receiverRecord, runnable, empty::tryEmitError);
        }).mergeWith(empty.asMono()).transform(flux2 -> {
            return new OrderManagingAcknowledgementOperator(flux2, ConsumerRecordExtraction::extractTopicPartition, j);
        });
    }

    private static long nextClientIdCount(String str) {
        return COUNTS_BY_CLIENT_ID.computeIfAbsent(str, str2 -> {
            return new AtomicLong();
        }).incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Scheduler schedulerForClient(String str) {
        return SCHEDULERS_BY_CLIENT_ID.computeIfAbsent(str, AloKafkaReceiver::newSchedulerForClient);
    }

    private static Scheduler newSchedulerForClient(String str) {
        return Schedulers.newBoundedElastic(Defaults.THREAD_CAP, Integer.MAX_VALUE, AloKafkaReceiver.class.getSimpleName() + "-" + str);
    }

    private static <K, V> AloConsumerRecordFactory<K, V> createAloFactory(Map<String, Object> map) {
        AloConsumerRecordFactory<K, V> aloConsumerRecordFactory = (AloConsumerRecordFactory) ConfigLoading.load(map, ALO_FACTORY_CONFIG, str -> {
            return (AloConsumerRecordFactory) Instantiation.one(str, new Object[DEFAULT_BLOCK_REQUEST_ON_PARTITION_POSITIONS]);
        }).orElseGet(DefaultAloConsumerRecordFactory::new);
        aloConsumerRecordFactory.configure(map);
        return aloConsumerRecordFactory;
    }

    private static <T> Mono<T> blockRequestOn(Future<?> future) {
        return Mono.empty().doOnRequest(j -> {
            try {
                future.get();
            } catch (Exception e) {
                LOGGER.error("Failed to block Request Thread on Future", e);
            }
        });
    }
}
