package io.micronaut.configuration.kafka.health;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.reactor.KafkaReactorUtil;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Singleton;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.config.ConfigResource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Singleton
@Requirements({@Requires(beans = {AdminClient.class}), @Requires(property = "kafka.health.enabled", value = "true", defaultValue = "true")})
/* loaded from: input_file:io/micronaut/configuration/kafka/health/KafkaHealthIndicator.class */
public class KafkaHealthIndicator implements HealthIndicator {
    private static final String ID = "kafka";
    private static final String REPLICATION_PROPERTY = "offsets.topic.replication.factor";
    private static final String DEFAULT_REPLICATION_PROPERTY = "default.replication.factor";
    private final AdminClient adminClient;
    private final KafkaDefaultConfiguration defaultConfiguration;

    public KafkaHealthIndicator(AdminClient adminClient, KafkaDefaultConfiguration kafkaDefaultConfiguration) {
        this.adminClient = adminClient;
        this.defaultConfiguration = kafkaDefaultConfiguration;
    }

    public static int getClusterReplicationFactor(Config config) {
        ConfigEntry configEntry = (ConfigEntry) Optional.ofNullable(config.get(REPLICATION_PROPERTY)).orElseGet(() -> {
            return config.get(DEFAULT_REPLICATION_PROPERTY);
        });
        if (configEntry != null) {
            return Integer.parseInt(configEntry.value());
        }
        return Integer.MAX_VALUE;
    }

    /* renamed from: getResult, reason: merged with bridge method [inline-methods] */
    public Flux<HealthResult> m61getResult() {
        DescribeClusterResult describeCluster = this.adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(Integer.valueOf((int) this.defaultConfiguration.getHealthTimeout().toMillis())));
        Objects.requireNonNull(describeCluster);
        Mono fromKafkaFuture = KafkaReactorUtil.fromKafkaFuture(describeCluster::clusterId);
        Objects.requireNonNull(describeCluster);
        Mono fromKafkaFuture2 = KafkaReactorUtil.fromKafkaFuture(describeCluster::nodes);
        Objects.requireNonNull(describeCluster);
        return KafkaReactorUtil.fromKafkaFuture(describeCluster::controller).flux().switchMap(node -> {
            String idString = node.idString();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, idString);
            DescribeConfigsResult describeConfigs = this.adminClient.describeConfigs(Collections.singletonList(configResource));
            Objects.requireNonNull(describeConfigs);
            return KafkaReactorUtil.fromKafkaFuture(describeConfigs::all).flux().switchMap(map -> {
                int clusterReplicationFactor = getClusterReplicationFactor((Config) map.get(configResource));
                return fromKafkaFuture2.flux().switchMap(collection -> {
                    return fromKafkaFuture.map(str -> {
                        int size = collection.size();
                        return (size >= clusterReplicationFactor ? HealthResult.builder("kafka", HealthStatus.UP) : HealthResult.builder("kafka", HealthStatus.DOWN)).details(CollectionUtils.mapOf(new Object[]{"brokerId", idString, "clusterId", str, "nodes", Integer.valueOf(size)})).build();
                    });
                });
            });
        }).onErrorResume(th -> {
            return Mono.just(HealthResult.builder("kafka", HealthStatus.DOWN).exception(th).build());
        });
    }
}
