package org.eclipse.ditto.internal.utils.ddata;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.pekko.actor.AbstractExtensionId;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorRefFactory;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ExtendedActorSystem;
import org.apache.pekko.actor.Extension;
import org.apache.pekko.cluster.ddata.Key;
import org.apache.pekko.cluster.ddata.ReplicatedData;
import org.apache.pekko.cluster.ddata.Replicator;
import org.apache.pekko.cluster.ddata.ReplicatorSettings;
import org.apache.pekko.pattern.Patterns;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/ddata/DistributedData.class */
public abstract class DistributedData<R extends ReplicatedData> implements Extension {
    protected final Duration readTimeout;
    protected final Duration writeTimeout;
    protected final ActorRef replicator;
    protected final int numberOfShards;
    private final Executor ddataExecutor;
    private final DistributedDataConfig config;

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/ddata/DistributedData$AbstractDDataProvider.class */
    public static abstract class AbstractDDataProvider<R extends ReplicatedData, T extends DistributedData<R>> extends AbstractExtensionId<T> {
        @Override // org.apache.pekko.actor.ExtensionId
        public abstract T createExtension(ExtendedActorSystem extendedActorSystem);

        public AbstractDDataProvider<R, T> lookup() {
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DistributedData(DistributedDataConfig distributedDataConfig, ActorRefFactory actorRefFactory, Executor executor) {
        Objects.requireNonNull(distributedDataConfig, "The DistributedDataConfig must not be null!");
        this.replicator = createReplicator(distributedDataConfig, actorRefFactory);
        this.ddataExecutor = executor;
        this.readTimeout = distributedDataConfig.getReadTimeout();
        this.writeTimeout = distributedDataConfig.getWriteTimeout();
        this.numberOfShards = distributedDataConfig.getNumberOfShards();
        this.config = distributedDataConfig;
    }

    private static ActorRef createReplicator(DistributedDataConfig distributedDataConfig, ActorRefFactory actorRefFactory) {
        PekkoReplicatorConfig pekkoReplicatorConfig = distributedDataConfig.getPekkoReplicatorConfig();
        return actorRefFactory.actorOf(Replicator.props(ReplicatorSettings.apply(pekkoReplicatorConfig.getCompleteConfig())), pekkoReplicatorConfig.getName());
    }

    public static DistributedDataConfig createConfig(ActorSystem actorSystem, CharSequence charSequence, CharSequence charSequence2) {
        return DefaultDistributedDataConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config()), charSequence, charSequence2);
    }

    protected abstract Key<R> getKey(int i);

    protected abstract R getInitialValue();

    public CompletionStage<Optional<R>> get(Key<R> key, Replicator.ReadConsistency readConsistency) {
        return (CompletionStage<Optional<R>>) Patterns.ask(this.replicator, new Replicator.Get(key, readConsistency), getAskTimeout(readConsistency.timeout(), this.readTimeout)).thenApplyAsync(obj -> {
            return handleGetResponse(obj, key);
        }, this.ddataExecutor);
    }

    public CompletionStage<Void> update(Key<R> key, Replicator.WriteConsistency writeConsistency, Function<R, R> function) {
        return Patterns.ask(this.replicator, new Replicator.Update(key, getInitialValue(), writeConsistency, function), getAskTimeout(writeConsistency.timeout(), this.writeTimeout)).thenApplyAsync(obj -> {
            return handleUpdateResponse(obj, key);
        }, this.ddataExecutor);
    }

    public void subscribeForChanges(ActorRef actorRef) {
        IntStream.range(0, this.numberOfShards).forEach(i -> {
            this.replicator.tell(new Replicator.Subscribe(getKey(i), actorRef), ActorRef.noSender());
        });
    }

    public ActorRef getReplicator() {
        return this.replicator;
    }

    public DistributedDataConfig getConfig() {
        return this.config;
    }

    private Void handleUpdateResponse(Object obj, Key<R> key) {
        if (obj instanceof Replicator.UpdateSuccess) {
            return null;
        }
        throw new IllegalArgumentException(MessageFormat.format("Expect Replicator.UpdateSuccess for key ''{2}'' from ''{1}'', Got: ''{0}''", obj, this.replicator, key));
    }

    private Optional<R> handleGetResponse(Object obj, Key<R> key) {
        if (obj instanceof Replicator.GetSuccess) {
            return Optional.of(((Replicator.GetSuccess) obj).dataValue());
        }
        if (obj instanceof Replicator.NotFound) {
            return Optional.empty();
        }
        throw new IllegalArgumentException(MessageFormat.format("Expect Replicator.GetResponse for key ''{2}'' from ''{1}'', Got: ''{0}''", obj, this.replicator, key));
    }

    private static Duration getAskTimeout(FiniteDuration finiteDuration, Duration duration) {
        return duration.isNegative() ? Duration.ofMillis(finiteDuration.toMillis()) : duration;
    }
}
