package org.eclipse.ditto.services.utils.ddata;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ReplicatedData;
import akka.cluster.ddata.Replicator;
import akka.pattern.PatternsCS;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/utils/ddata/DistributedData.class */
public abstract class DistributedData<R extends ReplicatedData> {
    protected final Duration readTimeout;
    protected final Duration writeTimeout;
    protected final ActorRef replicator;
    private final Executor ddataExecutor;

    /* JADX INFO: Access modifiers changed from: protected */
    public DistributedData(DistributedDataConfigReader distributedDataConfigReader, ActorRefFactory actorRefFactory, Executor executor) {
        Objects.requireNonNull(distributedDataConfigReader, "The DistributedDataConfigReader must not be null!");
        this.replicator = createReplicator(distributedDataConfigReader, actorRefFactory);
        this.ddataExecutor = executor;
        this.readTimeout = distributedDataConfigReader.getReadTimeout();
        this.writeTimeout = distributedDataConfigReader.getWriteTimeout();
    }

    private static ActorRef createReplicator(DistributedDataConfigReader distributedDataConfigReader, ActorRefFactory actorRefFactory) {
        return actorRefFactory.actorOf(Replicator.props(distributedDataConfigReader.toReplicatorSettings()), distributedDataConfigReader.getName());
    }

    protected abstract Key<R> getKey();

    protected abstract R getInitialValue();

    public CompletionStage<Optional<R>> get(Replicator.ReadConsistency readConsistency) {
        return (CompletionStage<Optional<R>>) PatternsCS.ask(this.replicator, new Replicator.Get(getKey(), readConsistency), getAskTimeout(readConsistency.timeout(), this.readTimeout)).thenApplyAsync(this::handleGetResponse, this.ddataExecutor);
    }

    public CompletionStage<Void> update(Replicator.WriteConsistency writeConsistency, Function<R, R> function) {
        return PatternsCS.ask(this.replicator, new Replicator.Update(getKey(), getInitialValue(), writeConsistency, function), getAskTimeout(writeConsistency.timeout(), this.writeTimeout)).thenApplyAsync(this::handleUpdateResponse, this.ddataExecutor);
    }

    public void subscribeForChanges(ActorRef actorRef) {
        this.replicator.tell(new Replicator.Subscribe(getKey(), actorRef), ActorRef.noSender());
    }

    public ActorRef getReplicator() {
        return this.replicator;
    }

    private Void handleUpdateResponse(Object obj) {
        if (obj instanceof Replicator.UpdateSuccess) {
            return null;
        }
        throw new IllegalArgumentException(MessageFormat.format("Expect Replicator.UpdateSuccess for key ''{2}'' from ''{1}'', Got: ''{0}''", obj, this.replicator, getKey()));
    }

    private Optional<R> handleGetResponse(Object obj) {
        if (obj instanceof Replicator.GetSuccess) {
            return Optional.of(((Replicator.GetSuccess) obj).dataValue());
        }
        if (obj instanceof Replicator.NotFound) {
            return Optional.empty();
        }
        throw new IllegalArgumentException(MessageFormat.format("Expect Replicator.GetResponse for key ''{2}'' from ''{1}'', Got: ''{0}''", obj, this.replicator, getKey()));
    }

    private static Duration getAskTimeout(FiniteDuration finiteDuration, Duration duration) {
        return duration.isNegative() ? Duration.ofMillis(finiteDuration.toMillis()) : duration;
    }
}
