package io.micronaut.configuration.kafka.streams.health;

import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.configuration.kafka.streams.KafkaStreamsFactory;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Internal;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.aggregator.HealthAggregator;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Singleton;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Singleton
@Requirements({@Requires(classes = {HealthIndicator.class}), @Requires(property = KafkaStreamsHealth.ENABLED_PROPERTY, value = "true", defaultValue = "true")})
/* loaded from: input_file:io/micronaut/configuration/kafka/streams/health/KafkaStreamsHealth.class */
public class KafkaStreamsHealth implements HealthIndicator {
    public static final String ENABLED_PROPERTY = "kafka.health.streams.enabled";
    private static final String NAME = "kafkaStreams";
    private final KafkaStreamsFactory kafkaStreamsFactory;
    private final HealthAggregator<?> healthAggregator;

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    /* loaded from: input_file:io/micronaut/configuration/kafka/streams/health/KafkaStreamsHealth$Pair.class */
    public static class Pair<K, V> {
        private final K key;
        private final V value;

        public Pair(K k, V v) {
            this.key = k;
            this.value = v;
        }

        public static <K, V> Pair<K, V> of(K k, V v) {
            return new Pair<>(k, v);
        }

        public K getKey() {
            return this.key;
        }

        public V getValue() {
            return this.value;
        }
    }

    public KafkaStreamsHealth(KafkaStreamsFactory kafkaStreamsFactory, HealthAggregator<?> healthAggregator) {
        this.kafkaStreamsFactory = kafkaStreamsFactory;
        this.healthAggregator = healthAggregator;
    }

    public Publisher<HealthResult> getResult() {
        return this.healthAggregator.aggregate(NAME, Flux.fromIterable(this.kafkaStreamsFactory.getStreams().keySet()).map(kafkaStreams -> {
            return Pair.of(getApplicationId(kafkaStreams), kafkaStreams);
        }).flatMap(pair -> {
            return Flux.just(pair).filter(pair -> {
                return ((KafkaStreams) pair.getValue()).state().isRunningOrRebalancing();
            }).map(pair2 -> {
                return HealthResult.builder((String) pair2.getKey(), HealthStatus.UP).details(buildDetails((KafkaStreams) pair2.getValue()));
            }).defaultIfEmpty(HealthResult.builder((String) pair.getKey(), HealthStatus.DOWN).details(buildDownDetails(((KafkaStreams) pair.getValue()).state()))).onErrorResume(th -> {
                return Flux.just(HealthResult.builder((String) pair.getKey(), HealthStatus.DOWN).details(buildDownDetails(th.getMessage(), ((KafkaStreams) pair.getValue()).state())));
            });
        }).map((v0) -> {
            return v0.build();
        }));
    }

    private Map<String, String> buildDownDetails(KafkaStreams.State state) {
        return buildDownDetails("Processor appears to be down", state);
    }

    private Map<String, String> buildDownDetails(String str, KafkaStreams.State state) {
        HashMap hashMap = new HashMap();
        hashMap.put("threadState", state.name());
        hashMap.put("error", str);
        return hashMap;
    }

    private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) {
        HashMap hashMap = new HashMap();
        if (kafkaStreams.state().isRunningOrRebalancing()) {
            for (ThreadMetadata threadMetadata : kafkaStreams.localThreadsMetadata()) {
                hashMap.put("threadName", threadMetadata.threadName());
                hashMap.put("threadState", threadMetadata.threadState());
                hashMap.put("adminClientId", threadMetadata.adminClientId());
                hashMap.put("consumerClientId", threadMetadata.consumerClientId());
                hashMap.put("restoreConsumerClientId", threadMetadata.restoreConsumerClientId());
                hashMap.put("producerClientIds", threadMetadata.producerClientIds());
                hashMap.put("activeTasks", taskDetails(threadMetadata.activeTasks()));
                hashMap.put("standbyTasks", taskDetails(threadMetadata.standbyTasks()));
            }
        } else {
            hashMap.put("error", "The processor is down");
        }
        return hashMap;
    }

    private String getApplicationId(KafkaStreams kafkaStreams) {
        try {
            ConfiguredStreamBuilder configuredStreamBuilder = this.kafkaStreamsFactory.getStreams().get(kafkaStreams);
            if (configuredStreamBuilder == null) {
                return getDefaultStreamName(kafkaStreams);
            }
            Properties configuration = configuredStreamBuilder.getConfiguration();
            return (String) configuration.getOrDefault("application.id", configuration.getProperty("client.id"));
        } catch (Exception e) {
            return getDefaultStreamName(kafkaStreams);
        }
    }

    private static String getDefaultStreamName(KafkaStreams kafkaStreams) {
        return (String) Optional.ofNullable(kafkaStreams).filter(kafkaStreams2 -> {
            return kafkaStreams2.state().isRunningOrRebalancing();
        }).map((v0) -> {
            return v0.localThreadsMetadata();
        }).map((v0) -> {
            return v0.stream();
        }).flatMap((v0) -> {
            return v0.findFirst();
        }).map((v0) -> {
            return v0.threadName();
        }).orElse("unidentified");
    }

    private static Map<String, Object> taskDetails(Set<TaskMetadata> set) {
        HashMap hashMap = new HashMap();
        for (TaskMetadata taskMetadata : set) {
            hashMap.put("taskId", taskMetadata.taskId());
            if (hashMap.containsKey("partitions")) {
                ((List) hashMap.get("partitions")).addAll(addPartitionsInfo(taskMetadata));
            } else {
                hashMap.put("partitions", addPartitionsInfo(taskMetadata));
            }
        }
        return hashMap;
    }

    private static List<String> addPartitionsInfo(TaskMetadata taskMetadata) {
        return (List) taskMetadata.topicPartitions().stream().map(topicPartition -> {
            return "partition=" + topicPartition.partition() + ", topic=" + topicPartition.topic();
        }).collect(Collectors.toList());
    }
}
