/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.discovery;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.neo4j.causalclustering.core.CoreGraphDatabase;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.core.state.machines.id.IdGenerationException;
import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.CoreTopologyService;
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.ErrorHandler;
import org.neo4j.causalclustering.discovery.IpFamily;
import org.neo4j.causalclustering.discovery.ReadReplica;
import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase;
import org.neo4j.concurrent.Futures;
import org.neo4j.function.Predicates;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.test.DbRepresentation;

public class Cluster {
    private static final int DEFAULT_TIMEOUT_MS = 120000;
    private static final int DEFAULT_CLUSTER_SIZE = 3;
    protected final File parentDir;
    private final Map<String, String> coreParams;
    private final Map<String, IntFunction<String>> instanceCoreParams;
    private final Map<String, String> readReplicaParams;
    private final Map<String, IntFunction<String>> instanceReadReplicaParams;
    private final String recordFormat;
    protected final DiscoveryServiceFactory discoveryServiceFactory;
    protected final String listenAddress;
    protected final String advertisedAddress;
    private Map<Integer, CoreClusterMember> coreMembers = new ConcurrentHashMap<Integer, CoreClusterMember>();
    private Map<Integer, ReadReplica> readReplicas = new ConcurrentHashMap<Integer, ReadReplica>();

    public Cluster(File parentDir, int noOfCoreMembers, int noOfReadReplicas, DiscoveryServiceFactory discoveryServiceFactory, Map<String, String> coreParams, Map<String, IntFunction<String>> instanceCoreParams, Map<String, String> readReplicaParams, Map<String, IntFunction<String>> instanceReadReplicaParams, String recordFormat, IpFamily ipFamily, boolean useWildcard) {
        this.discoveryServiceFactory = discoveryServiceFactory;
        this.parentDir = parentDir;
        this.coreParams = coreParams;
        this.instanceCoreParams = instanceCoreParams;
        this.readReplicaParams = readReplicaParams;
        this.instanceReadReplicaParams = instanceReadReplicaParams;
        this.recordFormat = recordFormat;
        this.listenAddress = useWildcard ? ipFamily.wildcardAddress() : ipFamily.localhostAddress();
        this.advertisedAddress = ipFamily.localhostName();
        List<AdvertisedSocketAddress> initialHosts = this.initialHosts(noOfCoreMembers);
        this.createCoreMembers(noOfCoreMembers, initialHosts, coreParams, instanceCoreParams, recordFormat);
        this.createReadReplicas(noOfReadReplicas, initialHosts, readReplicaParams, instanceReadReplicaParams, recordFormat);
    }

    private List<AdvertisedSocketAddress> initialHosts(int noOfCoreMembers) {
        return IntStream.range(0, noOfCoreMembers).mapToObj(ignored -> PortAuthority.allocatePort()).map(port -> new AdvertisedSocketAddress(this.advertisedAddress, port.intValue())).collect(Collectors.toList());
    }

    public void start() throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("cluster-starter"));
        try {
            this.startCoreMembers(executor);
            this.startReadReplicas(executor);
        }
        finally {
            executor.shutdown();
        }
    }

    public Set<CoreClusterMember> healthyCoreMembers() {
        return this.coreMembers.values().stream().filter(db -> ((DatabaseHealth)db.database().getDependencyResolver().resolveDependency(DatabaseHealth.class)).isHealthy()).collect(Collectors.toSet());
    }

    public CoreClusterMember getCoreMemberById(int memberId) {
        return this.coreMembers.get(memberId);
    }

    public ReadReplica getReadReplicaById(int memberId) {
        return this.readReplicas.get(memberId);
    }

    public CoreClusterMember addCoreMemberWithId(int memberId) {
        return this.addCoreMemberWithId(memberId, this.coreParams, this.instanceCoreParams, this.recordFormat);
    }

    private CoreClusterMember addCoreMemberWithId(int memberId, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat) {
        List<AdvertisedSocketAddress> initialHosts = this.extractInitialHosts(this.coreMembers);
        CoreClusterMember coreClusterMember = this.createCoreClusterMember(memberId, PortAuthority.allocatePort(), 3, initialHosts, recordFormat, extraParams, instanceExtraParams);
        this.coreMembers.put(memberId, coreClusterMember);
        return coreClusterMember;
    }

    public ReadReplica addReadReplicaWithIdAndRecordFormat(int memberId, String recordFormat) {
        return this.addReadReplica(memberId, recordFormat, new Monitors());
    }

    public ReadReplica addReadReplicaWithId(int memberId) {
        return this.addReadReplicaWithIdAndRecordFormat(memberId, this.recordFormat);
    }

    public ReadReplica addReadReplicaWithIdAndMonitors(int memberId, Monitors monitors) {
        return this.addReadReplica(memberId, this.recordFormat, monitors);
    }

    private ReadReplica addReadReplica(int memberId, String recordFormat, Monitors monitors) {
        List<AdvertisedSocketAddress> initialHosts = this.extractInitialHosts(this.coreMembers);
        ReadReplica member = this.createReadReplica(memberId, initialHosts, this.readReplicaParams, this.instanceReadReplicaParams, recordFormat, monitors);
        this.readReplicas.put(memberId, member);
        return member;
    }

    public void shutdown() {
        try (ErrorHandler errorHandler = new ErrorHandler("Error when trying to shutdown cluster");){
            this.shutdownCoreMembers(errorHandler);
            this.shutdownReadReplicas(errorHandler);
        }
    }

    private void shutdownCoreMembers(ErrorHandler errorHandler) {
        this.shutdownMembers(this.coreMembers(), errorHandler);
    }

    public void shutdownCoreMembers() {
        try (ErrorHandler errorHandler = new ErrorHandler("Error when trying to shutdown core members");){
            this.shutdownCoreMembers(errorHandler);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownMembers(Collection<? extends ClusterMember> clusterMembers, ErrorHandler errorHandler) {
        ExecutorService executor = Executors.newCachedThreadPool();
        ArrayList<Callable<Object>> memberShutdownSuppliers = new ArrayList<Callable<Object>>();
        for (ClusterMember clusterMember : clusterMembers) {
            memberShutdownSuppliers.add(() -> {
                clusterMember.shutdown();
                return null;
            });
        }
        try {
            Futures.combine(executor.invokeAll(memberShutdownSuppliers)).get();
        }
        catch (Exception e) {
            errorHandler.add(e);
        }
        finally {
            executor.shutdown();
        }
    }

    public void removeCoreMemberWithMemberId(int memberId) {
        CoreClusterMember memberToRemove = this.getCoreMemberById(memberId);
        if (memberToRemove == null) {
            throw new RuntimeException("Could not remove core member with id " + memberId);
        }
        memberToRemove.shutdown();
        this.removeCoreMember(memberToRemove);
    }

    public void removeCoreMember(CoreClusterMember memberToRemove) {
        memberToRemove.shutdown();
        this.coreMembers.values().remove(memberToRemove);
    }

    public void removeReadReplicaWithMemberId(int memberId) {
        ReadReplica memberToRemove = this.getReadReplicaById(memberId);
        if (memberToRemove == null) {
            throw new RuntimeException("Could not remove core member with member id " + memberId);
        }
        this.removeReadReplica(memberToRemove);
    }

    private void removeReadReplica(ReadReplica memberToRemove) {
        memberToRemove.shutdown();
        this.readReplicas.values().remove(memberToRemove);
    }

    public Collection<CoreClusterMember> coreMembers() {
        return this.coreMembers.values();
    }

    public Collection<ReadReplica> readReplicas() {
        return this.readReplicas.values();
    }

    public ReadReplica findAnyReadReplica() {
        return (ReadReplica)Iterables.firstOrNull(this.readReplicas.values());
    }

    public CoreClusterMember getDbWithRole(Role role) {
        return this.getDbWithAnyRole(role);
    }

    public CoreClusterMember getDbWithAnyRole(Role ... roles) {
        Set roleSet = Arrays.stream(roles).collect(Collectors.toSet());
        for (CoreClusterMember coreClusterMember : this.coreMembers.values()) {
            if (coreClusterMember.database() == null || !roleSet.contains(coreClusterMember.database().getRole())) continue;
            return coreClusterMember;
        }
        return null;
    }

    public CoreClusterMember awaitLeader() throws TimeoutException {
        return this.awaitCoreMemberWithRole(Role.LEADER, 120000L, TimeUnit.MILLISECONDS);
    }

    public CoreClusterMember awaitLeader(long timeout, TimeUnit timeUnit) throws TimeoutException {
        return this.awaitCoreMemberWithRole(Role.LEADER, timeout, timeUnit);
    }

    public CoreClusterMember awaitCoreMemberWithRole(Role role, long timeout, TimeUnit timeUnit) throws TimeoutException {
        return (CoreClusterMember)Predicates.await(() -> this.getDbWithRole(role), (Predicate)Predicates.notNull(), (long)timeout, (TimeUnit)timeUnit);
    }

    public int numberOfCoreMembersReportedByTopology() {
        CoreClusterMember aCoreGraphDb = this.coreMembers.values().stream().filter(member -> member.database() != null).findAny().orElseThrow(IllegalArgumentException::new);
        CoreTopologyService coreTopologyService = (CoreTopologyService)aCoreGraphDb.database().getDependencyResolver().resolveDependency(CoreTopologyService.class);
        return coreTopologyService.coreServers().members().size();
    }

    public CoreClusterMember coreTx(BiConsumer<CoreGraphDatabase, Transaction> op) throws Exception {
        return this.leaderTx(op, 120000, TimeUnit.MILLISECONDS);
    }

    private CoreClusterMember leaderTx(BiConsumer<CoreGraphDatabase, Transaction> op, int timeout, TimeUnit timeUnit) throws Exception {
        ThrowingSupplier supplier = () -> {
            CoreClusterMember member = this.awaitLeader(timeout, timeUnit);
            CoreGraphDatabase db = member.database();
            if (db == null) {
                throw new DatabaseShutdownException();
            }
            try (Transaction tx = db.beginTx();){
                op.accept(db, tx);
                CoreClusterMember coreClusterMember = member;
                return coreClusterMember;
            }
            catch (Throwable e) {
                if (!this.isTransientFailure(e)) throw e;
                System.err.println("Transient failure in leader transaction, trying again.");
                e.printStackTrace();
                return null;
            }
        };
        return (CoreClusterMember)Predicates.awaitEx((ThrowingSupplier)supplier, Predicates.notNull()::test, (long)timeout, (TimeUnit)timeUnit);
    }

    private boolean isTransientFailure(Throwable e) {
        return e instanceof IdGenerationException || this.isLockExpired(e) || this.isLockOnFollower(e) || this.isWriteNotOnLeader(e);
    }

    private boolean isWriteNotOnLeader(Throwable e) {
        return e instanceof WriteOperationsNotAllowedException && e.getMessage().startsWith(String.format("No write operations are allowed directly on this database. Writes must pass through the leader. The role of this server is: %s", ""));
    }

    private boolean isLockOnFollower(Throwable e) {
        return e instanceof AcquireLockTimeoutException && (e.getMessage().equals("Should only attempt to take locks when leader.") || e.getCause() instanceof NoLeaderFoundException);
    }

    private boolean isLockExpired(Throwable e) {
        return e instanceof TransactionFailureException && e.getCause() instanceof org.neo4j.kernel.api.exceptions.TransactionFailureException && ((org.neo4j.kernel.api.exceptions.TransactionFailureException)e.getCause()).status() == Status.Transaction.LockSessionExpired;
    }

    private List<AdvertisedSocketAddress> extractInitialHosts(Map<Integer, CoreClusterMember> coreMembers) {
        return coreMembers.values().stream().map(CoreClusterMember::discoveryPort).map(port -> new AdvertisedSocketAddress(this.advertisedAddress, port.intValue())).collect(Collectors.toList());
    }

    private void createCoreMembers(int noOfCoreMembers, List<AdvertisedSocketAddress> initialHosts, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat) {
        for (int i = 0; i < initialHosts.size(); ++i) {
            int discoveryListenAddress = initialHosts.get(i).getPort();
            CoreClusterMember coreClusterMember = this.createCoreClusterMember(i, discoveryListenAddress, noOfCoreMembers, initialHosts, recordFormat, extraParams, instanceExtraParams);
            this.coreMembers.put(i, coreClusterMember);
        }
    }

    protected CoreClusterMember createCoreClusterMember(int serverId, int hazelcastPort, int clusterSize, List<AdvertisedSocketAddress> initialHosts, String recordFormat, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams) {
        int txPort = PortAuthority.allocatePort();
        int raftPort = PortAuthority.allocatePort();
        int boltPort = PortAuthority.allocatePort();
        int httpPort = PortAuthority.allocatePort();
        int backupPort = PortAuthority.allocatePort();
        return new CoreClusterMember(serverId, hazelcastPort, txPort, raftPort, boltPort, httpPort, backupPort, clusterSize, initialHosts, this.discoveryServiceFactory, recordFormat, this.parentDir, extraParams, instanceExtraParams, this.listenAddress, this.advertisedAddress);
    }

    private ReadReplica createReadReplica(int serverId, List<AdvertisedSocketAddress> initialHosts, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat, Monitors monitors) {
        int boltPort = PortAuthority.allocatePort();
        int httpPort = PortAuthority.allocatePort();
        int txPort = PortAuthority.allocatePort();
        int backupPort = PortAuthority.allocatePort();
        return new ReadReplica(this.parentDir, serverId, boltPort, httpPort, txPort, backupPort, this.discoveryServiceFactory, initialHosts, extraParams, instanceExtraParams, recordFormat, monitors, this.advertisedAddress, this.listenAddress);
    }

    private void startCoreMembers(ExecutorService executor) throws InterruptedException, ExecutionException {
        ExecutorCompletionService<CoreGraphDatabase> ecs = new ExecutorCompletionService<CoreGraphDatabase>(executor);
        for (CoreClusterMember coreClusterMember : this.coreMembers.values()) {
            ecs.submit(() -> {
                coreClusterMember.start();
                return coreClusterMember.database();
            });
        }
        for (int i = 0; i < this.coreMembers.size(); ++i) {
            ecs.take().get();
        }
    }

    private void startReadReplicas(ExecutorService executor) throws InterruptedException, ExecutionException {
        ExecutorCompletionService<ReadReplicaGraphDatabase> rrcs = new ExecutorCompletionService<ReadReplicaGraphDatabase>(executor);
        for (ReadReplica readReplicas : this.readReplicas.values()) {
            rrcs.submit(() -> {
                readReplicas.start();
                return readReplicas.database();
            });
        }
        for (int i = 0; i < this.readReplicas.size(); ++i) {
            rrcs.take().get();
        }
    }

    private void createReadReplicas(int noOfReadReplicas, List<AdvertisedSocketAddress> initialHosts, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat) {
        for (int i = 0; i < noOfReadReplicas; ++i) {
            ReadReplica readReplica = this.createReadReplica(i, initialHosts, extraParams, instanceExtraParams, recordFormat, new Monitors());
            this.readReplicas.put(i, readReplica);
        }
    }

    private void shutdownReadReplicas(ErrorHandler errorHandler) {
        this.shutdownMembers(this.readReplicas(), errorHandler);
    }

    public static void dataOnMemberEventuallyLooksLike(CoreClusterMember memberThatChanges, CoreClusterMember memberToLookLike) throws TimeoutException, InterruptedException {
        Predicates.await(() -> {
            try {
                DbRepresentation representationToLookLike = DbRepresentation.of((GraphDatabaseService)memberToLookLike.database());
                DbRepresentation representationThatChanges = DbRepresentation.of((GraphDatabaseService)memberThatChanges.database());
                return representationToLookLike.equals((Object)representationThatChanges);
            }
            catch (DatabaseShutdownException databaseShutdownException) {
                return false;
            }
        }, (long)120000L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    public static <T extends ClusterMember> void dataMatchesEventually(ClusterMember source, Collection<T> targets) throws TimeoutException, InterruptedException {
        Cluster.dataMatchesEventually(DbRepresentation.of(source.database()), targets);
    }

    public static <T extends ClusterMember> void dataMatchesEventually(DbRepresentation source, Collection<T> targets) throws TimeoutException, InterruptedException {
        for (ClusterMember targetDB : targets) {
            Predicates.await(() -> {
                DbRepresentation representation = DbRepresentation.of(targetDB.database());
                return source.equals((Object)representation);
            }, (long)120000L, (TimeUnit)TimeUnit.MILLISECONDS);
        }
    }

    public void startCoreMembers() throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("core-starter"));
        try {
            this.startCoreMembers(executor);
        }
        finally {
            executor.shutdown();
        }
    }

    public ClusterMember getMemberByBoltAddress(AdvertisedSocketAddress advertisedSocketAddress) {
        for (CoreClusterMember coreClusterMember : this.coreMembers.values()) {
            if (!coreClusterMember.boltAdvertisedAddress().equals(advertisedSocketAddress.toString())) continue;
            return coreClusterMember;
        }
        for (ReadReplica readReplica : this.readReplicas.values()) {
            if (!readReplica.boltAdvertisedAddress().equals(advertisedSocketAddress.toString())) continue;
            return readReplica;
        }
        throw new RuntimeException("Could not find a member for bolt address " + advertisedSocketAddress);
    }
}

