package io.activej.crdt.storage.cluster;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.common.collection.Try;
import io.activej.crdt.CrdtData;
import io.activej.crdt.CrdtException;
import io.activej.crdt.storage.CrdtStorage;
import io.activej.crdt.util.RendezvousHashSharder;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamSplitter;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import java.lang.Comparable;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/crdt/storage/cluster/CrdtRepartitionController.class */
public final class CrdtRepartitionController<K extends Comparable<K>, S, P extends Comparable<P>> implements EventloopJmxBeanEx {
    private final P localPartitionId;
    private final CrdtStorage<K, S> localClient;
    private final CrdtStorageCluster<K, S, P> cluster;
    private boolean detailedStats;
    private final AsyncSupplier<Void> repartition = AsyncSuppliers.reuse(this::doRepartition);
    private final EventStats repartitionCount = EventStats.create(Duration.ofMinutes(5));
    private final PromiseStats repartitionPromise = PromiseStats.create(Duration.ofMinutes(5));
    private final StreamStatsBasic<CrdtData<K, S>> repartitionStats = StreamStats.basic();
    private final StreamStatsDetailed<CrdtData<K, S>> repartitionStatsDetailed = StreamStats.detailed();
    private final StreamStatsBasic<K> removeStats = StreamStats.basic();
    private final StreamStatsDetailed<K> removeStatsDetailed = StreamStats.detailed();

    public CrdtRepartitionController(P p, CrdtStorage<K, S> crdtStorage, CrdtStorageCluster<K, S, P> crdtStorageCluster) {
        this.localClient = crdtStorage;
        this.cluster = crdtStorageCluster;
        this.localPartitionId = p;
    }

    public static <K extends Comparable<K>, S, P extends Comparable<P>> CrdtRepartitionController<K, S, P> create(CrdtStorageCluster<K, S, P> crdtStorageCluster, P p) {
        return new CrdtRepartitionController<>(p, crdtStorageCluster.getPartitions().getPartitions().get(p), crdtStorageCluster);
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.cluster.getEventloop();
    }

    public Promise<Void> repartition() {
        return this.repartition.get();
    }

    @NotNull
    private Promise<Void> doRepartition() {
        return Promises.toTuple(this.cluster.upload().toTry(), this.localClient.remove().toTry(), this.localClient.download().toTry()).then(tuple3 -> {
            if (((Try) tuple3.getValue1()).isSuccess() && ((Try) tuple3.getValue2()).isSuccess() && ((Try) tuple3.getValue3()).isSuccess()) {
                StreamConsumer streamConsumer = (StreamConsumer) ((StreamConsumer) ((Try) tuple3.getValue1()).get()).transformWith(this.detailedStats ? this.repartitionStatsDetailed : this.repartitionStats);
                StreamConsumer streamConsumer2 = (StreamConsumer) ((StreamConsumer) ((Try) tuple3.getValue2()).get()).transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats);
                StreamSupplier streamSupplier = (StreamSupplier) ((Try) tuple3.getValue3()).get();
                RendezvousHashSharder<P> sharder = this.cluster.getPartitions().getSharder();
                int indexOf = sharder.indexOf(this.localPartitionId);
                StreamSplitter create = StreamSplitter.create((crdtData, streamDataAcceptorArr) -> {
                    StreamDataAcceptor streamDataAcceptor = streamDataAcceptorArr[0];
                    StreamDataAcceptor streamDataAcceptor2 = streamDataAcceptorArr[1];
                    streamDataAcceptor.accept(crdtData);
                    if (arrayContains(sharder.shard(crdtData.getKey()), indexOf)) {
                        return;
                    }
                    streamDataAcceptor2.accept(crdtData.getKey());
                });
                create.newOutput().streamTo(streamConsumer);
                create.newOutput().streamTo(streamConsumer2);
                return streamSupplier.streamTo(create.getInput());
            }
            CrdtException crdtException = new CrdtException("Repartition exceptions:");
            Try r0 = (Try) tuple3.getValue1();
            Consumer consumer = (v0) -> {
                v0.close();
            };
            Objects.requireNonNull(crdtException);
            r0.consume(consumer, crdtException::addSuppressed);
            Try r02 = (Try) tuple3.getValue2();
            Consumer consumer2 = (v0) -> {
                v0.close();
            };
            Objects.requireNonNull(crdtException);
            r02.consume(consumer2, crdtException::addSuppressed);
            Try r03 = (Try) tuple3.getValue3();
            Consumer consumer3 = (v0) -> {
                v0.close();
            };
            Objects.requireNonNull(crdtException);
            r03.consume(consumer3, crdtException::addSuppressed);
            return Promise.ofException(crdtException);
        });
    }

    private static boolean arrayContains(int[] iArr, int i) {
        for (int i2 : iArr) {
            if (i2 == i) {
                return true;
            }
        }
        return false;
    }

    @JmxAttribute
    public boolean isDetailedStats() {
        return this.detailedStats;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public String getLocalPartitionId() {
        return this.localPartitionId.toString();
    }

    @JmxAttribute
    public EventStats getRepartitionCount() {
        return this.repartitionCount;
    }

    @JmxAttribute
    public PromiseStats getRepartitionPromise() {
        return this.repartitionPromise;
    }

    @JmxAttribute
    public StreamStatsBasic getRepartitionStats() {
        return this.repartitionStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRepartitionStatsDetailed() {
        return this.repartitionStatsDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public StreamStatsDetailed getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }
}
