/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.discovery;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.ClientConnectorAddresses;
import org.neo4j.causalclustering.discovery.CoreTopology;
import org.neo4j.causalclustering.discovery.HazelcastClusterTopology;
import org.neo4j.causalclustering.discovery.HazelcastConnector;
import org.neo4j.causalclustering.discovery.HazelcastInstanceNotActiveException;
import org.neo4j.causalclustering.discovery.ReadReplicaTopology;
import org.neo4j.causalclustering.discovery.RobustHazelcastWrapper;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.discovery.TopologyServiceMultiRetryStrategy;
import org.neo4j.causalclustering.discovery.TopologyServiceRetryStrategy;
import org.neo4j.causalclustering.helper.RobustJobSchedulerWrapper;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.function.ThrowingAction;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;

class HazelcastClient
extends LifecycleAdapter
implements TopologyService {
    private final Log log;
    private final ClientConnectorAddresses connectorAddresses;
    private final RobustHazelcastWrapper hzInstance;
    private final RobustJobSchedulerWrapper scheduler;
    private final Config config;
    private final long timeToLive;
    private final long refreshPeriod;
    private final AdvertisedSocketAddress transactionSource;
    private final MemberId myself;
    private final List<String> groups;
    private final TopologyServiceRetryStrategy topologyServiceRetryStrategy;
    private JobScheduler.JobHandle keepAliveJob;
    private JobScheduler.JobHandle refreshTopologyJob;
    private volatile Map<MemberId, AdvertisedSocketAddress> catchupAddressMap = new HashMap<MemberId, AdvertisedSocketAddress>();
    private volatile CoreTopology coreTopology = CoreTopology.EMPTY;
    private volatile ReadReplicaTopology rrTopology = ReadReplicaTopology.EMPTY;

    HazelcastClient(HazelcastConnector connector, JobScheduler scheduler, LogProvider logProvider, Config config, MemberId myself, TopologyServiceRetryStrategy topologyServiceRetryStrategy) {
        this.hzInstance = new RobustHazelcastWrapper(connector);
        this.config = config;
        this.log = logProvider.getLog(this.getClass());
        this.scheduler = new RobustJobSchedulerWrapper(scheduler, this.log);
        this.connectorAddresses = ClientConnectorAddresses.extractFromConfig(config);
        this.transactionSource = (AdvertisedSocketAddress)config.get(CausalClusteringSettings.transaction_advertised_address);
        this.timeToLive = ((Duration)config.get(CausalClusteringSettings.read_replica_time_to_live)).toMillis();
        this.refreshPeriod = ((Duration)config.get(CausalClusteringSettings.cluster_topology_refresh)).toMillis();
        this.myself = myself;
        this.groups = (List)config.get(CausalClusteringSettings.server_groups);
        this.topologyServiceRetryStrategy = HazelcastClient.resolveStrategy(this.refreshPeriod);
    }

    private static TopologyServiceRetryStrategy resolveStrategy(long refreshPeriodMillis) {
        int pollingFrequencyWithinRefreshWindow = 2;
        int numberOfRetries = pollingFrequencyWithinRefreshWindow + 1;
        return new TopologyServiceMultiRetryStrategy(refreshPeriodMillis / (long)pollingFrequencyWithinRefreshWindow, numberOfRetries);
    }

    @Override
    public CoreTopology coreServers() {
        return this.coreTopology;
    }

    @Override
    public ReadReplicaTopology readReplicas() {
        return this.rrTopology;
    }

    @Override
    public Optional<AdvertisedSocketAddress> findCatchupAddress(MemberId memberId) {
        return this.topologyServiceRetryStrategy.apply(memberId, this::retrieveSocketAddress, Optional::isPresent);
    }

    private Optional<AdvertisedSocketAddress> retrieveSocketAddress(MemberId memberId) {
        return Optional.ofNullable(this.catchupAddressMap.get(memberId));
    }

    private void refreshTopology() throws HazelcastInstanceNotActiveException {
        this.coreTopology = this.hzInstance.apply(hz -> HazelcastClusterTopology.getCoreTopology(hz, this.config, this.log));
        this.rrTopology = this.hzInstance.apply(hz -> HazelcastClusterTopology.getReadReplicaTopology(hz, this.log));
        this.catchupAddressMap = HazelcastClusterTopology.extractCatchupAddressesMap(this.coreTopology, this.rrTopology);
    }

    public void start() throws Throwable {
        this.keepAliveJob = this.scheduler.scheduleRecurring("KeepAlive", this.timeToLive / 3L, (ThrowingAction<Exception>)((ThrowingAction)this::keepReadReplicaAlive));
        this.refreshTopologyJob = this.scheduler.scheduleRecurring("TopologyRefresh", this.refreshPeriod, (ThrowingAction<Exception>)((ThrowingAction)this::refreshTopology));
    }

    public void stop() throws Throwable {
        this.keepAliveJob.cancel(true);
        this.refreshTopologyJob.cancel(true);
        this.disconnectFromCore();
    }

    private void disconnectFromCore() {
        try {
            String uuid = this.hzInstance.apply(hzInstance -> hzInstance.getLocalEndpoint().getUuid());
            this.hzInstance.apply(hz -> hz.getMap("read_replicas").remove((Object)uuid));
            this.hzInstance.shutdown();
        }
        catch (Throwable e) {
            this.log.warn("Unable to shutdown hazelcast cleanly", e);
        }
    }

    private void keepReadReplicaAlive() throws HazelcastInstanceNotActiveException {
        this.hzInstance.perform(hazelcastInstance -> {
            String uuid = hazelcastInstance.getLocalEndpoint().getUuid();
            String addresses = this.connectorAddresses.toString();
            this.log.debug("Adding read replica into cluster (%s -> %s)", new Object[]{uuid, addresses});
            hazelcastInstance.getMap("read-replica-transaction-servers").put((Object)uuid, (Object)this.transactionSource.toString(), this.timeToLive, TimeUnit.MILLISECONDS);
            hazelcastInstance.getMap("read-replica-member-ids").put((Object)uuid, (Object)this.myself.getUuid().toString(), this.timeToLive, TimeUnit.MILLISECONDS);
            HazelcastClusterTopology.refreshGroups(hazelcastInstance, uuid, this.groups);
            hazelcastInstance.getMap("read_replicas").put((Object)uuid, (Object)addresses, this.timeToLive, TimeUnit.MILLISECONDS);
        });
    }
}

