package io.activej.crdt.storage.cluster;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.service.EventloopService;
import io.activej.async.util.LogUtils;
import io.activej.common.initializer.WithInitializer;
import io.activej.crdt.storage.CrdtStorage;
import io.activej.crdt.util.RendezvousHashSharder;
import io.activej.eventloop.Eventloop;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import java.lang.Comparable;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/crdt/storage/cluster/CrdtPartitions.class */
public final class CrdtPartitions<K extends Comparable<K>, S, P extends Comparable<P>> implements EventloopService, WithInitializer<CrdtPartitions<K, S, P>> {
    private static final Logger logger = LoggerFactory.getLogger(CrdtPartitions.class);
    private final DiscoveryService<K, S, P> discoveryService;
    private final Eventloop eventloop;
    private RendezvousHashSharder<P> sharder;
    private final SortedMap<P, CrdtStorage<K, S>> alivePartitions = new TreeMap();
    private final Map<P, CrdtStorage<K, S>> alivePartitionsView = Collections.unmodifiableMap(this.alivePartitions);
    private final SortedMap<P, CrdtStorage<K, S>> deadPartitions = new TreeMap();
    private final Map<P, CrdtStorage<K, S>> deadPartitionsView = Collections.unmodifiableMap(this.deadPartitions);
    private final AsyncSupplier<Void> checkAllPartitions = AsyncSuppliers.reuse(this::doCheckAllPartitions);
    private final AsyncSupplier<Void> checkDeadPartitions = AsyncSuppliers.reuse(this::doCheckDeadPartitions);
    private final SortedMap<P, CrdtStorage<K, S>> partitions = new TreeMap();
    private final Map<P, CrdtStorage<K, S>> partitionsView = Collections.unmodifiableMap(this.partitions);
    private int topShards = 1;

    private CrdtPartitions(Eventloop eventloop, DiscoveryService<K, S, P> discoveryService) {
        this.eventloop = eventloop;
        this.discoveryService = discoveryService;
    }

    public static <K extends Comparable<K>, S, P extends Comparable<P>> CrdtPartitions<K, S, P> create(Eventloop eventloop, DiscoveryService<K, S, P> discoveryService) {
        return new CrdtPartitions<>(eventloop, discoveryService);
    }

    public CrdtPartitions<K, S, P> withTopShards(int i) {
        this.topShards = i;
        return this;
    }

    public void setTopShards(int i) {
        this.topShards = i;
        this.sharder = RendezvousHashSharder.create(this.alivePartitions.keySet(), i);
    }

    public RendezvousHashSharder<P> getSharder() {
        return this.sharder;
    }

    public Map<P, CrdtStorage<K, S>> getPartitions() {
        return this.partitionsView;
    }

    public Map<P, CrdtStorage<K, S>> getAlivePartitions() {
        return this.alivePartitionsView;
    }

    public Map<P, CrdtStorage<K, S>> getDeadPartitions() {
        return this.deadPartitionsView;
    }

    @Nullable
    public CrdtStorage<K, S> get(P p) {
        return this.alivePartitions.get(p);
    }

    public Promise<Void> checkAllPartitions() {
        return this.checkAllPartitions.get().whenComplete(LogUtils.toLogger(logger, "checkAllPartitions", new Object[0]));
    }

    public Promise<Void> checkDeadPartitions() {
        return this.checkDeadPartitions.get().whenComplete(LogUtils.toLogger(logger, "checkDeadPartitions", new Object[0]));
    }

    public boolean markDead(P p, @Nullable Throwable th) {
        CrdtStorage<K, S> remove = this.alivePartitions.remove(p);
        if (remove == null) {
            return false;
        }
        logger.warn("marking {} as dead ", p, th);
        this.deadPartitions.put(p, remove);
        recompute();
        return true;
    }

    public void markAlive(P p) {
        CrdtStorage<K, S> remove = this.deadPartitions.remove(p);
        if (remove != null) {
            logger.info("Partition {} is alive again!", p);
            this.alivePartitions.put(p, remove);
            recompute();
        }
    }

    private void recompute() {
        this.sharder = RendezvousHashSharder.create(this.alivePartitions.keySet(), this.topShards);
    }

    private void rediscover() {
        this.discoveryService.discover(this.partitions, (map, th) -> {
            if (th != null) {
                logger.warn("Could not discover partitions", th);
                this.eventloop.delayBackground(Duration.ofSeconds(1L), this::rediscover);
            } else {
                updatePartitions(map);
                recompute();
                checkAllPartitions().whenResult(this::rediscover);
            }
        });
    }

    private void updatePartitions(Map<P, ? extends CrdtStorage<K, S>> map) {
        this.partitions.clear();
        this.partitions.putAll(map);
        this.alivePartitions.clear();
        this.deadPartitions.clear();
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<?> start() {
        return Promise.ofCallback(settablePromise -> {
            this.discoveryService.discover(null, (map, th) -> {
                if (th != null) {
                    settablePromise.setException(th);
                    return;
                }
                this.partitions.putAll(map);
                this.alivePartitions.putAll(map);
                recompute();
                checkAllPartitions().whenComplete(settablePromise).whenResult(this::rediscover);
            });
        });
    }

    @NotNull
    public Promise<?> stop() {
        return Promise.complete();
    }

    public String toString() {
        return "CrdtPartitions{partitions=" + this.partitions + ", deadPartitions=" + this.deadPartitions + '}';
    }

    private Promise<Void> doCheckAllPartitions() {
        return Promises.all(this.partitions.entrySet().stream().map(entry -> {
            Comparable comparable = (Comparable) entry.getKey();
            return ((CrdtStorage) entry.getValue()).ping().mapEx((r6, th) -> {
                if (th == null) {
                    markAlive(comparable);
                    return null;
                }
                markDead(comparable, th);
                return null;
            });
        }));
    }

    private Promise<Void> doCheckDeadPartitions() {
        return Promises.all(this.deadPartitions.entrySet().stream().map(entry -> {
            return ((CrdtStorage) entry.getValue()).ping().mapEx((r5, th) -> {
                if (th != null) {
                    return null;
                }
                markAlive((Comparable) entry.getKey());
                return null;
            });
        }));
    }

    @JmxAttribute
    public List<String> getAllPartitions() {
        return (List) this.partitions.keySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList());
    }
}
