package org.eclipse.ditto.services.concierge.actors.cleanup.persistenceids;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.tuple.Tuple3;
import akka.pattern.Patterns;
import akka.stream.OverflowStrategy;
import akka.stream.SourceRef;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.eclipse.ditto.model.base.entity.id.DefaultEntityId;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.services.concierge.common.PersistenceIdsConfig;
import org.eclipse.ditto.services.models.connectivity.ConnectivityMessagingConstants;
import org.eclipse.ditto.services.models.policies.PoliciesMessagingConstants;
import org.eclipse.ditto.services.models.streaming.AbstractEntityIdWithRevision;
import org.eclipse.ditto.services.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.services.models.streaming.SudoStreamPids;
import org.eclipse.ditto.services.models.things.ThingsMessagingConstants;
import org.eclipse.ditto.services.utils.akka.controlflow.ResumeSource;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;

/* loaded from: input_file:org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource.class */
public final class PersistenceIdSource {
    private static final List<String> PERSISTENCE_STREAMING_ACTOR_PATHS = Arrays.asList(ThingsMessagingConstants.THINGS_STREAM_PROVIDER_ACTOR_PATH, PoliciesMessagingConstants.POLICIES_STREAM_PROVIDER_ACTOR_PATH, ConnectivityMessagingConstants.STREAM_PROVIDER_ACTOR_PATH);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource$EmptyEntityIdWithRevision.class */
    public static final class EmptyEntityIdWithRevision extends AbstractEntityIdWithRevision<EntityId> {
        private EmptyEntityIdWithRevision() {
            super(DefaultEntityId.dummy(), 0L);
        }
    }

    private PersistenceIdSource() {
        throw new AssertionError();
    }

    public static Source<EntityIdWithRevision, NotUsed> create(PersistenceIdsConfig persistenceIdsConfig, ActorRef actorRef) {
        return Source.from(PERSISTENCE_STREAMING_ACTOR_PATHS).buffer(1, OverflowStrategy.backpressure()).flatMapConcat(str -> {
            return buildResumeSource(persistenceIdsConfig, actorRef, str).recoverWithRetries(1, Throwable.class, Source::empty);
        });
    }

    private static Source<EntityIdWithRevision, NotUsed> buildResumeSource(PersistenceIdsConfig persistenceIdsConfig, ActorRef actorRef, String str) {
        EmptyEntityIdWithRevision emptyEntityIdWithRevision = new EmptyEntityIdWithRevision();
        return ResumeSource.onFailureWithBackoff(persistenceIdsConfig.getMinBackoff(), persistenceIdsConfig.getMaxBackoff(), persistenceIdsConfig.getMaxRestarts(), persistenceIdsConfig.getRecovery(), emptyEntityIdWithRevision, entityIdWithRevision -> {
            return Source.single(requestStreamCommand(persistenceIdsConfig, str, entityIdWithRevision)).mapAsync(1, send -> {
                return Patterns.ask(actorRef, send, persistenceIdsConfig.getStreamRequestTimeout()).handle((obj, th) -> {
                    return Tuple3.create(send, obj, th);
                });
            }).flatMapConcat(PersistenceIdSource::checkForErrors).flatMapConcat(PersistenceIdSource::handleSourceRef);
        }, 1, list -> {
            return list.isEmpty() ? emptyEntityIdWithRevision : (EntityIdWithRevision) list.get(list.size() - 1);
        });
    }

    private static Source<Object, NotUsed> checkForErrors(Tuple3<DistributedPubSubMediator.Send, Object, Throwable> tuple3) {
        return tuple3.t2() != null ? Source.single(tuple3.t2()) : Source.failed(new IllegalStateException(String.format("Error on sending <%s>: %s", Objects.toString(tuple3.t1()), Objects.toString(tuple3.t3()))));
    }

    private static DistributedPubSubMediator.Send requestStreamCommand(PersistenceIdsConfig persistenceIdsConfig, String str, EntityIdWithRevision entityIdWithRevision) {
        return DistPubSubAccess.send(str, sudoStreamSnapshotRevisions(persistenceIdsConfig, entityIdWithRevision), false);
    }

    private static SudoStreamPids sudoStreamSnapshotRevisions(PersistenceIdsConfig persistenceIdsConfig, EntityIdWithRevision entityIdWithRevision) {
        return SudoStreamPids.of(Integer.valueOf(persistenceIdsConfig.getBurst()), Long.valueOf(persistenceIdsConfig.getStreamIdleTimeout().toMillis()), DittoHeaders.empty()).withLowerBound(entityIdWithRevision);
    }

    private static Source<EntityIdWithRevision, NotUsed> handleSourceRef(Object obj) {
        return obj instanceof SourceRef ? createSourceFromSourceRef((SourceRef) obj) : failedSourceDueToUnexpectedMessage("SourceRef", obj);
    }

    private static Source<EntityIdWithRevision, NotUsed> createSourceFromSourceRef(SourceRef<?> sourceRef) {
        return sourceRef.getSource().flatMapConcat(obj -> {
            return obj instanceof BatchedEntityIdWithRevisions ? Source.from(((BatchedEntityIdWithRevisions) obj).getElements()).map(entityIdWithRevision -> {
                return entityIdWithRevision;
            }) : failedSourceDueToUnexpectedMessage("BatchedEntityIdWithRevisions", obj);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [java.lang.Throwable] */
    private static <T> Source<T, NotUsed> failedSourceDueToUnexpectedMessage(String str, Object obj) {
        return Source.failed(obj instanceof Throwable ? (Throwable) obj : new IllegalStateException(String.format("While expecting <%s>, got unexpected <%s>", str, Objects.toString(obj))));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1772957332:
                if (implMethodName.equals("checkForErrors")) {
                    z = 5;
                    break;
                }
                break;
            case -678871760:
                if (implMethodName.equals("handleSourceRef")) {
                    z = 2;
                    break;
                }
                break;
            case -333291737:
                if (implMethodName.equals("lambda$create$f382f284$1")) {
                    z = false;
                    break;
                }
                break;
            case -7356216:
                if (implMethodName.equals("lambda$buildResumeSource$3098e438$1")) {
                    z = true;
                    break;
                }
                break;
            case 1767947323:
                if (implMethodName.equals("lambda$createSourceFromSourceRef$15a93ef$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1767947324:
                if (implMethodName.equals("lambda$createSourceFromSourceRef$15a93ef$2")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/concierge/common/PersistenceIdsConfig;Lakka/actor/ActorRef;Ljava/lang/String;)Lakka/stream/Graph;")) {
                    PersistenceIdsConfig persistenceIdsConfig = (PersistenceIdsConfig) serializedLambda.getCapturedArg(0);
                    ActorRef actorRef = (ActorRef) serializedLambda.getCapturedArg(1);
                    return str -> {
                        return buildResumeSource(persistenceIdsConfig, actorRef, str).recoverWithRetries(1, Throwable.class, Source::empty);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource") && serializedLambda.getImplMethodSignature().equals("(Lakka/actor/ActorRef;Lorg/eclipse/ditto/services/concierge/common/PersistenceIdsConfig;Lakka/cluster/pubsub/DistributedPubSubMediator$Send;)Ljava/util/concurrent/CompletionStage;")) {
                    ActorRef actorRef2 = (ActorRef) serializedLambda.getCapturedArg(0);
                    PersistenceIdsConfig persistenceIdsConfig2 = (PersistenceIdsConfig) serializedLambda.getCapturedArg(1);
                    return send -> {
                        return Patterns.ask(actorRef2, send, persistenceIdsConfig2.getStreamRequestTimeout()).handle((obj, th) -> {
                            return Tuple3.create(send, obj, th);
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lakka/stream/javadsl/Source;")) {
                    return PersistenceIdSource::handleSourceRef;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/models/streaming/EntityIdWithRevision;)Lorg/eclipse/ditto/services/models/streaming/EntityIdWithRevision;")) {
                    return entityIdWithRevision -> {
                        return entityIdWithRevision;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lakka/stream/Graph;")) {
                    return obj -> {
                        return obj instanceof BatchedEntityIdWithRevisions ? Source.from(((BatchedEntityIdWithRevisions) obj).getElements()).map(entityIdWithRevision2 -> {
                            return entityIdWithRevision2;
                        }) : failedSourceDueToUnexpectedMessage("BatchedEntityIdWithRevisions", obj);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/concierge/actors/cleanup/persistenceids/PersistenceIdSource") && serializedLambda.getImplMethodSignature().equals("(Lakka/japi/tuple/Tuple3;)Lakka/stream/javadsl/Source;")) {
                    return PersistenceIdSource::checkForErrors;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
