package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.AloSignalListenerFactory;
import io.atleon.core.AloSignalListenerFactoryConfig;
import io.atleon.core.ErrorEmitter;
import io.atleon.core.OrderManagingAcknowledgementOperator;
import io.atleon.kafka.NacknowledgerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;

/* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver.class */
public class AloKafkaReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.receiver.";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "kafka.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String ERROR_EMISSION_TIMEOUT_CONFIG = "kafka.receiver.error.emission.timeout";
    public static final String BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG = "kafka.receiver.block.request.on.partition.positions";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "kafka.receiver.max.in.flight.per.subscription";
    public static final String AUTO_INCREMENT_CLIENT_ID_CONFIG = "kafka.receiver.auto.increment.client.id";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.receiver.poll.timeout";
    public static final String COMMIT_INTERVAL_CONFIG = "kafka.receiver.commit.interval";
    public static final String MAX_COMMIT_ATTEMPTS_CONFIG = "kafka.receiver.max.commit.attempts";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.receiver.close.timeout";
    private static final boolean DEFAULT_BLOCK_REQUEST_ON_PARTITION_POSITIONS = false;
    private static final long DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION = 4096;
    private static final boolean DEFAULT_AUTO_INCREMENT_CLIENT_ID = false;
    private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 100;
    private final KafkaConfigSource configSource;
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100);
    private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30);
    private static final Logger LOGGER = LoggerFactory.getLogger(AloKafkaReceiver.class);
    private static final Map<String, AtomicLong> COUNTS_BY_ID = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver$ReceiveResources.class */
    public static final class ReceiveResources<K, V> {
        private final KafkaConfig config;
        private final NacknowledgerFactory<K, V> nacknowledgerFactory;

        public ReceiveResources(KafkaConfig kafkaConfig) {
            this.config = kafkaConfig;
            this.nacknowledgerFactory = createNacknowledgerFactory(kafkaConfig);
        }

        public Flux<Alo<ConsumerRecord<K, V>>> receive(ReceiverOptionsInitializer<K, V> receiverOptionsInitializer) {
            CompletableFuture completableFuture = new CompletableFuture();
            AloFactory<ConsumerRecord<K, V>> loadAloFactory = loadAloFactory();
            ErrorEmitter<Alo<ConsumerRecord<K, V>>> newErrorEmitter = newErrorEmitter();
            Objects.requireNonNull(completableFuture);
            Flux map = newReceiver(receiverOptionsInitializer, (v1) -> {
                r2.complete(v1);
            }).receive().transform(flux -> {
                return maybeBlockRequestOnPartitionPositioning(flux, completableFuture);
            }).map(receiverRecord -> {
                Objects.requireNonNull(newErrorEmitter);
                return toAlo(receiverRecord, loadAloFactory, newErrorEmitter::safelyEmit);
            });
            Objects.requireNonNull(newErrorEmitter);
            return map.transform((v1) -> {
                return r1.applyTo(v1);
            }).transform(this::newOrderManagingAcknowledgementOperator).transform(this::applySignalListenerFactories);
        }

        private AloFactory<ConsumerRecord<K, V>> loadAloFactory() {
            return AloFactoryConfig.loadDecorated(this.config.modifyAndGetProperties(map -> {
            }), AloKafkaConsumerRecordDecorator.class);
        }

        private ErrorEmitter<Alo<ConsumerRecord<K, V>>> newErrorEmitter() {
            return ErrorEmitter.create(this.config.loadDuration(AloKafkaReceiver.ERROR_EMISSION_TIMEOUT_CONFIG).orElse(ErrorEmitter.DEFAULT_TIMEOUT));
        }

        private KafkaReceiver<K, V> newReceiver(ReceiverOptionsInitializer<K, V> receiverOptionsInitializer, Consumer<Collection<ReceiverPartition>> consumer) {
            return KafkaReceiver.create(receiverOptionsInitializer.initialize(newConsumerConfig()).pollTimeout(this.config.loadDuration(AloKafkaReceiver.POLL_TIMEOUT_CONFIG).orElse(AloKafkaReceiver.DEFAULT_POLL_TIMEOUT)).commitInterval(this.config.loadDuration(AloKafkaReceiver.COMMIT_INTERVAL_CONFIG).orElse(AloKafkaReceiver.DEFAULT_COMMIT_INTERVAL)).maxCommitAttempts(this.config.loadInt(AloKafkaReceiver.MAX_COMMIT_ATTEMPTS_CONFIG).orElse(Integer.valueOf(AloKafkaReceiver.DEFAULT_MAX_COMMIT_ATTEMPTS)).intValue()).closeTimeout(this.config.loadDuration(AloKafkaReceiver.CLOSE_TIMEOUT_CONFIG).orElse(AloKafkaReceiver.DEFAULT_CLOSE_TIMEOUT)).addAssignListener(consumer));
        }

        private Map<String, Object> newConsumerConfig() {
            return this.config.modifyAndGetProperties(map -> {
                map.keySet().removeIf(str -> {
                    return str.startsWith(AloKafkaReceiver.CONFIG_PREFIX);
                });
                if (this.config.loadBoolean(AloKafkaReceiver.AUTO_INCREMENT_CLIENT_ID_CONFIG).orElse(false).booleanValue()) {
                    map.computeIfPresent("client.id", (str2, obj) -> {
                        return incrementId(obj.toString());
                    });
                }
            });
        }

        private Flux<ReceiverRecord<K, V>> maybeBlockRequestOnPartitionPositioning(Flux<ReceiverRecord<K, V>> flux, CompletableFuture<Collection<ReceiverPartition>> completableFuture) {
            return this.config.loadBoolean(AloKafkaReceiver.BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG).orElse(false).booleanValue() ? flux.mergeWith(blockRequestOnPartitionPositioning(completableFuture)) : flux;
        }

        private OrderManagingAcknowledgementOperator<ConsumerRecord<K, V>, Alo<ConsumerRecord<K, V>>> newOrderManagingAcknowledgementOperator(Flux<Alo<ConsumerRecord<K, V>>> flux) {
            return new OrderManagingAcknowledgementOperator<>(flux, ConsumerRecordExtraction::topicPartition, this.config.loadLong(AloKafkaReceiver.MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG).orElse(Long.valueOf(AloKafkaReceiver.DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION)).longValue());
        }

        private Flux<Alo<ConsumerRecord<K, V>>> applySignalListenerFactories(Flux<Alo<ConsumerRecord<K, V>>> flux) {
            Iterator it = AloSignalListenerFactoryConfig.loadList(this.config.modifyAndGetProperties(map -> {
            }), AloKafkaConsumerRecordSignalListenerFactory.class).iterator();
            while (it.hasNext()) {
                flux = flux.tap((AloSignalListenerFactory) it.next());
            }
            return flux;
        }

        private Alo<ConsumerRecord<K, V>> toAlo(ReceiverRecord<K, V> receiverRecord, AloFactory<ConsumerRecord<K, V>> aloFactory, Consumer<Throwable> consumer) {
            ReceiverOffset receiverOffset = receiverRecord.receiverOffset();
            Objects.requireNonNull(receiverOffset);
            return aloFactory.create(receiverRecord, receiverOffset::acknowledge, this.nacknowledgerFactory.create(receiverRecord, consumer));
        }

        private static <K, V> NacknowledgerFactory<K, V> createNacknowledgerFactory(KafkaConfig kafkaConfig) {
            return (NacknowledgerFactory) loadNacknowledgerFactory(kafkaConfig, AloKafkaReceiver.NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class).orElseGet(NacknowledgerFactory.Emit::new);
        }

        private static <K, V, N extends NacknowledgerFactory<K, V>> Optional<NacknowledgerFactory<K, V>> loadNacknowledgerFactory(KafkaConfig kafkaConfig, String str, Class<N> cls) {
            return kafkaConfig.loadConfiguredWithPredefinedTypes(str, cls, ReceiveResources::newPredefinedNacknowledgerFactory);
        }

        private static <K, V> Optional<NacknowledgerFactory<K, V>> newPredefinedNacknowledgerFactory(String str) {
            return str.equalsIgnoreCase(AloKafkaReceiver.NACKNOWLEDGER_TYPE_EMIT) ? Optional.of(new NacknowledgerFactory.Emit()) : Optional.empty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static String incrementId(String str) {
            return str + "-" + ((AtomicLong) AloKafkaReceiver.COUNTS_BY_ID.computeIfAbsent(str, str2 -> {
                return new AtomicLong();
            })).incrementAndGet();
        }

        private static <T> Mono<T> blockRequestOnPartitionPositioning(CompletableFuture<Collection<ReceiverPartition>> completableFuture) {
            return blockRequestOn(completableFuture.thenAccept(collection -> {
                collection.forEach((v0) -> {
                    v0.position();
                });
            }));
        }

        private static <T> Mono<T> blockRequestOn(Future<?> future) {
            return Mono.empty().doOnRequest(j -> {
                try {
                    future.get();
                } catch (Exception e) {
                    AloKafkaReceiver.LOGGER.error("Failed to block Request Thread on Future", e);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver$ReceiverOptionsInitializer.class */
    public interface ReceiverOptionsInitializer<K, V> {
        ReceiverOptions<K, V> initialize(Map<String, Object> map);
    }

    private AloKafkaReceiver(KafkaConfigSource kafkaConfigSource) {
        this.configSource = kafkaConfigSource;
    }

    public static <K, V> AloKafkaReceiver<K, V> from(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaReceiver<>(kafkaConfigSource);
    }

    public static <V> AloKafkaReceiver<Object, V> forValues(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaReceiver<>(kafkaConfigSource);
    }

    public AloFlux<V> receiveAloValues(String str) {
        return receiveAloValues(Collections.singletonList(str));
    }

    public AloFlux<V> receiveAloValues(Collection<String> collection) {
        return receiveAloRecords(collection).mapNotNull((v0) -> {
            return v0.value();
        });
    }

    public AloFlux<V> receiveAloValues(Pattern pattern) {
        return receiveAloRecords(pattern).mapNotNull((v0) -> {
            return v0.value();
        });
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(String str) {
        return receiveAloRecords(Collections.singletonList(str));
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Collection<String> collection) {
        return receiveAloRecords(map -> {
            return ReceiverOptions.create(map).subscription(collection);
        });
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Pattern pattern) {
        return receiveAloRecords(map -> {
            return ReceiverOptions.create(map).subscription(pattern);
        });
    }

    private AloFlux<ConsumerRecord<K, V>> receiveAloRecords(ReceiverOptionsInitializer<K, V> receiverOptionsInitializer) {
        return (AloFlux) ((Mono) this.configSource.create()).map(ReceiveResources::new).flatMapMany(receiveResources -> {
            return receiveResources.receive(receiverOptionsInitializer);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }
}
