/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.govern.discovery.redis;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import me.ahoo.govern.core.NamespacedContext;
import me.ahoo.govern.core.listener.ChannelTopic;
import me.ahoo.govern.core.listener.MessageListenable;
import me.ahoo.govern.core.listener.MessageListener;
import me.ahoo.govern.core.listener.PatternTopic;
import me.ahoo.govern.core.listener.Topic;
import me.ahoo.govern.discovery.DiscoveryKeyGenerator;
import me.ahoo.govern.discovery.Instance;
import me.ahoo.govern.discovery.InstanceIdGenerator;
import me.ahoo.govern.discovery.NamespacedServiceId;
import me.ahoo.govern.discovery.ServiceChangedListener;
import me.ahoo.govern.discovery.ServiceDiscovery;
import me.ahoo.govern.discovery.ServiceInstance;
import me.ahoo.govern.discovery.ServiceListenable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsistencyRedisServiceDiscovery
implements ServiceDiscovery,
ServiceListenable {
    private static final Logger log = LoggerFactory.getLogger(ConsistencyRedisServiceDiscovery.class);
    private final ServiceDiscovery delegate;
    private final MessageListenable messageListenable;
    private final ServiceIdxListener serviceIdxListener;
    private final InstanceListener instanceListener;
    private final ConcurrentHashMap<NamespacedServiceId, CompletableFuture<CopyOnWriteArrayList<ServiceInstance>>> serviceMapInstances = new ConcurrentHashMap();
    private final ConcurrentHashMap<NamespacedServiceId, CopyOnWriteArraySet<ServiceChangedListener>> serviceMapListener;
    private final ConcurrentHashMap<String, CompletableFuture<Set<String>>> namespaceMapServices = new ConcurrentHashMap();

    public ConsistencyRedisServiceDiscovery(ServiceDiscovery delegate, MessageListenable messageListenable) {
        this.serviceMapListener = new ConcurrentHashMap();
        this.delegate = delegate;
        this.messageListenable = messageListenable;
        this.serviceIdxListener = new ServiceIdxListener();
        this.instanceListener = new InstanceListener();
    }

    @Override
    public CompletableFuture<Set<String>> getServices(String namespace) {
        return this.namespaceMapServices.computeIfAbsent(namespace, _namespace -> {
            String serviceIdxKey = DiscoveryKeyGenerator.getServiceIdxKey(namespace);
            ChannelTopic serviceIdxTopic = ChannelTopic.of((String)serviceIdxKey);
            return this.messageListenable.addListener((Topic)serviceIdxTopic, (MessageListener)this.serviceIdxListener).thenCompose(nil -> this.delegate.getServices(namespace));
        });
    }

    @Override
    public CompletableFuture<Set<String>> getServices() {
        return this.getServices(NamespacedContext.GLOBAL.getNamespace());
    }

    @Override
    public CompletableFuture<List<ServiceInstance>> getInstances(String serviceId) {
        return this.getInstances(NamespacedContext.GLOBAL.getNamespace(), serviceId);
    }

    @Override
    public CompletableFuture<List<ServiceInstance>> getInstances(String namespace, String serviceId) {
        return this.serviceMapInstances.computeIfAbsent(NamespacedServiceId.of(namespace, serviceId), _serviceId -> this.addListener(namespace, serviceId).thenCompose(nil -> this.delegate.getInstances(namespace, serviceId).thenApply(serviceInstances -> new CopyOnWriteArrayList(serviceInstances)))).thenApply(serviceInstances -> serviceInstances.stream().filter(instance -> !instance.isExpired()).collect(Collectors.toList()));
    }

    public CompletableFuture<ServiceInstance> getInstance0(String namespace, String serviceId, String instanceId) {
        NamespacedServiceId namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
        CompletableFuture<CopyOnWriteArrayList<ServiceInstance>> instancesFuture = this.serviceMapInstances.get(namespacedServiceId);
        if (Objects.isNull(instancesFuture)) {
            return CompletableFuture.completedFuture(null);
        }
        return instancesFuture.thenApply(serviceInstances -> {
            if (Objects.isNull(serviceInstances)) {
                return null;
            }
            Optional<ServiceInstance> cachedInstanceOp = serviceInstances.stream().filter(itc -> itc.getInstanceId().equals(instanceId)).findFirst();
            return cachedInstanceOp.orElse(ServiceInstance.NOT_FOUND);
        });
    }

    @Override
    public CompletableFuture<ServiceInstance> getInstance(String namespace, String serviceId, String instanceId) {
        return this.getInstance0(namespace, serviceId, instanceId).thenCompose(instance -> {
            if (ServiceInstance.NOT_FOUND.equals(instance)) {
                return CompletableFuture.completedFuture(null);
            }
            if (Objects.isNull(instance)) {
                return this.delegate.getInstance(namespace, serviceId, instanceId);
            }
            return CompletableFuture.completedFuture(instance);
        });
    }

    @Override
    public CompletableFuture<Long> getInstanceTtl(String namespace, String serviceId, String instanceId) {
        return this.getInstance0(namespace, serviceId, instanceId).thenCompose(instance -> {
            if (ServiceInstance.NOT_FOUND.equals(instance)) {
                return CompletableFuture.completedFuture(null);
            }
            if (Objects.isNull(instance)) {
                return this.delegate.getInstanceTtl(namespace, serviceId, instanceId);
            }
            return CompletableFuture.completedFuture(instance.getTtlAt());
        });
    }

    @VisibleForTesting
    public CompletableFuture<Void> addListener(String namespace, String serviceId) {
        PatternTopic instanceTopic = this.getPatternTopic(namespace, serviceId);
        return this.messageListenable.addListener((Topic)instanceTopic, (MessageListener)this.instanceListener);
    }

    @Override
    public void addListener(NamespacedServiceId namespacedServiceId, ServiceChangedListener serviceChangedListener) {
        this.serviceMapListener.compute(namespacedServiceId, (key, val) -> {
            CopyOnWriteArraySet<ServiceChangedListener> listeners = val;
            if (Objects.isNull(val)) {
                listeners = new CopyOnWriteArraySet<ServiceChangedListener>();
            }
            listeners.add(serviceChangedListener);
            return listeners;
        });
    }

    @Override
    public void removeListener(NamespacedServiceId namespacedServiceId, ServiceChangedListener serviceChangedListener) {
        this.serviceMapListener.compute(namespacedServiceId, (key, val) -> {
            if (Objects.isNull(val)) {
                return null;
            }
            CopyOnWriteArraySet listeners = val;
            listeners.remove(serviceChangedListener);
            return listeners;
        });
    }

    @VisibleForTesting
    public Future<Void> removeListener(String namespace, String serviceId) {
        PatternTopic instanceTopic = this.getPatternTopic(namespace, serviceId);
        return this.messageListenable.removeListener((Topic)instanceTopic, (MessageListener)this.instanceListener);
    }

    private PatternTopic getPatternTopic(String namespace, String serviceId) {
        String instancePattern = DiscoveryKeyGenerator.getInstanceKeyPatternOfService(namespace, serviceId);
        PatternTopic instanceTopic = PatternTopic.of((String)instancePattern);
        return instanceTopic;
    }

    private class InstanceListener
    implements MessageListener {
        private InstanceListener() {
        }

        public void onMessage(Topic topic, String channel, String message) {
            if (log.isInfoEnabled()) {
                log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}]", new Object[]{topic, channel, message});
            }
            String instanceKey = channel;
            String namespace = DiscoveryKeyGenerator.getNamespaceOfKey(instanceKey);
            String instanceId = DiscoveryKeyGenerator.getInstanceIdOfKey(namespace, instanceKey);
            Instance instance = InstanceIdGenerator.DEFAULT.of(instanceId);
            String serviceId = instance.getServiceId();
            NamespacedServiceId namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
            CopyOnWriteArraySet serviceChangedListeners = (CopyOnWriteArraySet)ConsistencyRedisServiceDiscovery.this.serviceMapListener.get(namespacedServiceId);
            CompletableFuture instancesFuture = (CompletableFuture)ConsistencyRedisServiceDiscovery.this.serviceMapInstances.get(namespacedServiceId);
            if (Objects.isNull(instancesFuture)) {
                if (log.isInfoEnabled()) {
                    log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}] instancesFuture is null.", new Object[]{topic, channel, message});
                }
                this.invokeChanged(message, namespacedServiceId, serviceChangedListeners);
                return;
            }
            ((CompletableFuture)instancesFuture.thenCompose(cachedInstances -> {
                if (Objects.isNull(cachedInstances)) {
                    if (log.isInfoEnabled()) {
                        log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}] cachedInstances is null.", new Object[]{topic, channel, message});
                    }
                    return CompletableFuture.completedFuture(null);
                }
                ServiceInstance cachedInstance = cachedInstances.stream().filter(itc -> itc.getInstanceId().equals(instanceId)).findFirst().orElse(ServiceInstance.NOT_FOUND);
                if ("register".equals(message)) {
                    if (log.isInfoEnabled()) {
                        log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}] add registered Instance.", new Object[]{topic, channel, message});
                    }
                    return ConsistencyRedisServiceDiscovery.this.delegate.getInstance(namespace, serviceId, instanceId).thenAccept(registeredInstance -> {
                        if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) {
                            cachedInstances.add(registeredInstance);
                        } else {
                            cachedInstance.setSchema(registeredInstance.getSchema());
                            cachedInstance.setIp(registeredInstance.getIp());
                            cachedInstance.setPort(registeredInstance.getPort());
                            cachedInstance.setEphemeral(registeredInstance.isEphemeral());
                            cachedInstance.setTtlAt(registeredInstance.getTtlAt());
                            cachedInstance.setWeight(registeredInstance.getWeight());
                            cachedInstance.setMetadata(registeredInstance.getMetadata());
                        }
                    });
                }
                if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) {
                    if (log.isWarnEnabled()) {
                        log.warn("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}] not found cached Instance.", new Object[]{topic, channel, message});
                    }
                    return CompletableFuture.completedFuture(null);
                }
                switch (message) {
                    case "renew": {
                        if (log.isInfoEnabled()) {
                            log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}] setTtlAt.", new Object[]{topic, channel, message});
                        }
                        return ConsistencyRedisServiceDiscovery.this.delegate.getInstanceTtl(namespace, serviceId, instanceId).thenAccept(nextTtlAt -> cachedInstance.setTtlAt((long)nextTtlAt));
                    }
                    case "set_metadata": {
                        if (log.isInfoEnabled()) {
                            log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}] setMetadata.", new Object[]{topic, channel, message});
                        }
                        return ConsistencyRedisServiceDiscovery.this.delegate.getInstance(namespace, serviceId, instanceId).thenAccept(nextInstance -> cachedInstance.setMetadata(nextInstance.getMetadata()));
                    }
                    case "deregister": 
                    case "expired": {
                        if (log.isInfoEnabled()) {
                            log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}] remove instance.", new Object[]{topic, channel, message});
                        }
                        cachedInstances.remove(cachedInstance);
                        return CompletableFuture.completedFuture(null);
                    }
                }
                throw new IllegalStateException("Unexpected value: " + message);
            })).thenAccept(nil -> this.invokeChanged(message, namespacedServiceId, serviceChangedListeners));
        }

        private void invokeChanged(String message, NamespacedServiceId namespacedServiceId, CopyOnWriteArraySet<ServiceChangedListener> serviceChangedListeners) {
            if (Objects.nonNull(serviceChangedListeners) && !serviceChangedListeners.isEmpty()) {
                serviceChangedListeners.forEach(serviceChangedListener -> serviceChangedListener.onChange(namespacedServiceId, message));
            }
        }
    }

    private class ServiceIdxListener
    implements MessageListener {
        private ServiceIdxListener() {
        }

        public void onMessage(Topic topic, String channel, String message) {
            if (log.isInfoEnabled()) {
                log.info("onMessage@ServiceIdxListener - topic:[{}] - channel:[{}] - message:[{}]", new Object[]{topic, channel, message});
            }
            String serviceIdxKey = channel;
            String namespace = DiscoveryKeyGenerator.getNamespaceOfKey(serviceIdxKey);
            ConsistencyRedisServiceDiscovery.this.namespaceMapServices.put(namespace, ConsistencyRedisServiceDiscovery.this.delegate.getServices(namespace));
        }
    }
}

