package org.eclipse.ditto.services.utils.pubsub;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.services.utils.ddata.DistributedDataConfig;
import org.eclipse.ditto.services.utils.pubsub.actors.AckSupervisor;
import org.eclipse.ditto.services.utils.pubsub.api.AckRequest;
import org.eclipse.ditto.services.utils.pubsub.api.AcksDeclared;
import org.eclipse.ditto.services.utils.pubsub.api.DeclareAcks;
import org.eclipse.ditto.services.utils.pubsub.api.ReceiveLocalAcks;
import org.eclipse.ditto.services.utils.pubsub.api.ReceiveRemoteAcks;
import org.eclipse.ditto.services.utils.pubsub.api.RemoveSubscriberAcks;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralDData;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/DistributedAcksImpl.class */
final class DistributedAcksImpl implements DistributedAcks {
    private static final String CLUSTER_ROLE = "acks-aware";
    private final DistributedDataConfig config;
    private final ActorRef ackSupervisor;

    private DistributedAcksImpl(DistributedDataConfig distributedDataConfig, ActorRef actorRef) {
        this.config = distributedDataConfig;
        this.ackSupervisor = actorRef;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DistributedAcks create(ActorContext actorContext) {
        return create(actorContext, actorContext.system());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DistributedAcks create(ActorRefFactory actorRefFactory, ActorSystem actorSystem) {
        return create(actorRefFactory, actorSystem, CLUSTER_ROLE, LiteralDDataProvider.of(CLUSTER_ROLE, "acks"));
    }

    static DistributedAcks create(ActorRefFactory actorRefFactory, ActorSystem actorSystem, String str, LiteralDDataProvider literalDDataProvider) {
        return new DistributedAcksImpl(literalDDataProvider.getConfig(actorSystem), actorRefFactory.actorOf(AckSupervisor.props(LiteralDData.of(actorSystem, literalDDataProvider)), str + "-ack-supervisor"));
    }

    private CompletionStage<AcksDeclared> askSupervisor(AckRequest ackRequest) {
        return Patterns.ask(this.ackSupervisor, ackRequest, this.config.getWriteTimeout()).thenCompose(DistributedAcksImpl::processAskResponse);
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.DistributedAcks
    public void receiveLocalDeclaredAcks(ActorRef actorRef) {
        this.ackSupervisor.tell(ReceiveLocalAcks.of(actorRef), actorRef);
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.DistributedAcks
    public void receiveDistributedDeclaredAcks(ActorRef actorRef) {
        this.ackSupervisor.tell(ReceiveRemoteAcks.of(actorRef), actorRef);
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.DistributedAcks
    public void removeSubscriber(ActorRef actorRef) {
        this.ackSupervisor.tell(RemoveSubscriberAcks.of(actorRef), actorRef);
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.DistributedAcks
    public CompletionStage<AcksDeclared> declareAcknowledgementLabels(Collection<AcknowledgementLabel> collection, ActorRef actorRef) {
        return askSupervisor(DeclareAcks.of(actorRef, null, (Set) collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toSet())));
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.DistributedAcks
    public CompletionStage<AcksDeclared> declareAcknowledgementLabels(Collection<AcknowledgementLabel> collection, ActorRef actorRef, @Nullable String str) {
        if (str != null) {
            ConditionChecker.checkNotEmpty(str, "group");
        }
        return askSupervisor(DeclareAcks.of(actorRef, str, (Set) collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toSet())));
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.DistributedAcks
    public void removeAcknowledgementLabelDeclaration(ActorRef actorRef) {
        this.ackSupervisor.tell(RemoveSubscriberAcks.of(actorRef), ActorRef.noSender());
    }

    private static CompletionStage<AcksDeclared> processAskResponse(Object obj) {
        return obj instanceof AcksDeclared ? CompletableFuture.completedStage((AcksDeclared) obj) : obj instanceof Throwable ? CompletableFuture.failedStage((Throwable) obj) : CompletableFuture.failedStage(new ClassCastException("Expect SubAck, got: " + obj));
    }
}
