package org.eclipse.ditto.services.utils.persistence.mongo;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamRefs;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.bson.Document;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.services.models.streaming.StreamedSnapshot;
import org.eclipse.ditto.services.models.streaming.SudoStreamSnapshots;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

@AllValuesAreNonnullByDefault
/* loaded from: input_file:org/eclipse/ditto/services/utils/persistence/mongo/SnapshotStreamingActor.class */
public final class SnapshotStreamingActor extends AbstractActor {
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final ActorMaterializer materializer = ActorMaterializer.create(getContext());
    private final Function<String, EntityId> pid2EntityId;
    private final Function<EntityId, String> entityId2Pid;
    private final DittoMongoClient mongoClient;
    private final MongoReadJournal readJournal;

    private SnapshotStreamingActor(Function<String, EntityId> function, Function<EntityId, String> function2, DittoMongoClient dittoMongoClient, MongoReadJournal mongoReadJournal) {
        this.pid2EntityId = function;
        this.entityId2Pid = function2;
        this.mongoClient = dittoMongoClient;
        this.readJournal = mongoReadJournal;
    }

    private SnapshotStreamingActor(Function<String, EntityId> function, Function<EntityId, String> function2) {
        this.pid2EntityId = function;
        this.entityId2Pid = function2;
        Config config = getContext().getSystem().settings().config();
        this.mongoClient = MongoClientWrapper.newInstance(DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config)));
        this.readJournal = MongoReadJournal.newInstance(config, this.mongoClient);
    }

    public static Props props(Function<String, EntityId> function, Function<EntityId, String> function2) {
        return Props.create(SnapshotStreamingActor.class, new Object[]{function, function2});
    }

    public static Props propsForTest(Function<String, EntityId> function, Function<EntityId, String> function2, DittoMongoClient dittoMongoClient, MongoReadJournal mongoReadJournal) {
        return Props.create(SnapshotStreamingActor.class, new Object[]{function, function2, dittoMongoClient, mongoReadJournal});
    }

    public void postStop() throws Exception {
        this.mongoClient.close();
        super.postStop();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(SudoStreamSnapshots.class, this::startStreaming).matchAny(obj -> {
            this.log.warning("Unexpected message: <{}>", obj);
        }).build();
    }

    private Source<StreamedSnapshot, NotUsed> createSource(SudoStreamSnapshots sudoStreamSnapshots) {
        this.log.info("Starting stream for <{}>", sudoStreamSnapshots);
        return this.readJournal.getNewestSnapshotsAbove(sudoStreamSnapshots.hasNonEmptyLowerBound() ? this.entityId2Pid.apply(sudoStreamSnapshots.getLowerBound()) : "", sudoStreamSnapshots.getBurst(), this.materializer, (String[]) sudoStreamSnapshots.getSnapshotFields().stream().map((v0) -> {
            return v0.asString();
        }).toArray(i -> {
            return new String[i];
        })).map(this::mapSnapshot).log("snapshot-streaming", this.log);
    }

    private StreamedSnapshot mapSnapshot(Document document) {
        EntityId apply = this.pid2EntityId.apply(document.getString(MongoReadJournal.ID));
        document.remove(MongoReadJournal.ID);
        return StreamedSnapshot.of(apply, JsonObject.of(document.toJson()));
    }

    private void startStreaming(SudoStreamSnapshots sudoStreamSnapshots) {
        Duration ofMillis = Duration.ofMillis(sudoStreamSnapshots.getTimeoutMillis());
        Patterns.pipe((CompletionStage) createSource(sudoStreamSnapshots).initialTimeout(ofMillis).idleTimeout(ofMillis).runWith(StreamRefs.sourceRef(), this.materializer), getContext().getDispatcher()).to(getSender());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -403061952:
                if (implMethodName.equals("mapSnapshot")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/SnapshotStreamingActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/bson/Document;)Lorg/eclipse/ditto/services/models/streaming/StreamedSnapshot;")) {
                    SnapshotStreamingActor snapshotStreamingActor = (SnapshotStreamingActor) serializedLambda.getCapturedArg(0);
                    return snapshotStreamingActor::mapSnapshot;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
