package org.eclipse.ditto.internal.utils.persistentactors;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.internal.utils.akka.PingCommand;
import org.eclipse.ditto.internal.utils.akka.PingCommandResponse;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.config.PingConfig;
import org.eclipse.ditto.internal.utils.persistentactors.config.RateConfig;
import org.eclipse.ditto.json.JsonValue;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/persistentactors/PersistencePingActor.class */
public final class PersistencePingActor extends AbstractActor {
    public static final String ACTOR_NAME = "persistencePing";
    private static final String CORRELATION_ID_PREFIX = "persistence-ping-actor-triggered:";
    private final ActorRef persistenceActorShardRegion;
    private final Supplier<Source<String, NotUsed>> persistenceIdsSourceSupplier;
    private final PingConfig pingConfig;
    private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);

    @Nullable
    private Cancellable pingCheck = null;
    private boolean pingInProgress = false;
    private final Materializer materializer = Materializer.createMaterializer(this::getContext);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/ditto/internal/utils/persistentactors/PersistencePingActor$InternalMessages.class */
    public enum InternalMessages {
        START_PINGING,
        PINGING_FINISHED
    }

    private PersistencePingActor(ActorRef actorRef, PingConfig pingConfig, Supplier<Source<String, NotUsed>> supplier) {
        this.persistenceActorShardRegion = actorRef;
        this.persistenceIdsSourceSupplier = supplier;
        this.pingConfig = pingConfig;
    }

    private PersistencePingActor(ActorRef actorRef, PingConfig pingConfig, MongoReadJournal mongoReadJournal) {
        this.persistenceActorShardRegion = actorRef;
        this.pingConfig = pingConfig;
        switch (pingConfig.getStreamingOrder()) {
            case TAGS:
                this.persistenceIdsSourceSupplier = () -> {
                    return mongoReadJournal.getJournalPidsWithTagOrderedByPriorityTag(pingConfig.getJournalTag(), pingConfig.getInterval());
                };
                break;
            case ID:
            default:
                this.persistenceIdsSourceSupplier = () -> {
                    return mongoReadJournal.getJournalPidsWithTag(pingConfig.getJournalTag(), pingConfig.getReadJournalBatchSize(), pingConfig.getInterval(), this.materializer);
                };
                break;
        }
        mongoReadJournal.ensureTagPidIndex().exceptionally(th -> {
            this.log.error(th, "Failed to create TagPidIndex");
            return null;
        });
    }

    public static Props props(ActorRef actorRef, PingConfig pingConfig, MongoReadJournal mongoReadJournal) {
        return Props.create((Class<?>) PersistencePingActor.class, actorRef, pingConfig, mongoReadJournal);
    }

    static Props propsForTests(ActorRef actorRef, PingConfig pingConfig, Supplier<Source<String, NotUsed>> supplier) {
        return Props.create((Class<?>) PersistencePingActor.class, actorRef, pingConfig, supplier);
    }

    private Cancellable schedulePing() {
        AbstractActor.ActorContext context = getContext();
        InternalMessages internalMessages = InternalMessages.START_PINGING;
        this.log.info("Scheduling ping for all PersistenceActors with initial delay <{}> and interval <{}>.", this.pingConfig.getInitialDelay(), this.pingConfig.getInterval());
        return context.getSystem().scheduler().scheduleAtFixedRate(this.pingConfig.getInitialDelay(), this.pingConfig.getInterval(), getSelf(), internalMessages, context.dispatcher(), ActorRef.noSender());
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public void preStart() throws Exception {
        super.preStart();
        this.pingCheck = schedulePing();
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public void postStop() throws Exception {
        if (null != this.pingCheck) {
            this.pingCheck.cancel();
        }
        super.postStop();
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PingCommandResponse.class, pingCommandResponse -> {
            this.log.debug("Received PersistencePingCommandResponse with correlation-id <{}> and payload <{}> from sender <{}>", pingCommandResponse.getCorrelationId(), pingCommandResponse.getPayload(), getSender());
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            this.log.debug("Received <{}> for correlation-id <{}>: {}", dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getDittoHeaders().getCorrelationId().orElse("unknown"), dittoRuntimeException.getMessage());
        }).matchEquals(InternalMessages.START_PINGING, internalMessages -> {
            startPinging();
        }).matchEquals(InternalMessages.PINGING_FINISHED, internalMessages2 -> {
            pingingFinished();
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void startPinging() {
        if (this.pingInProgress) {
            this.log.info("Another ping iteration is currently in progress. Next iteration will be started after <{}>.", this.pingConfig.getInterval());
            return;
        }
        this.log.info("Sending ping for PersistenceActors. Will be sent again after the configured interval of <{}>.", this.pingConfig.getInterval());
        this.pingInProgress = true;
        Source<String, NotUsed> source = this.persistenceIdsSourceSupplier.get();
        if (source == null) {
            this.log.warning("Failed to create new persistence id source for persistence actor ping.");
        } else {
            RateConfig rateConfig = this.pingConfig.getRateConfig();
            source.throttle(rateConfig.getEntityAmount(), rateConfig.getFrequency()).runForeach(this::ping, this.materializer).thenRun(() -> {
                this.log.info("Sending pings completed.");
                getSelf().tell(InternalMessages.PINGING_FINISHED, getSelf());
            });
        }
    }

    private void pingingFinished() {
        this.log.info("Got ping completed.");
        this.pingInProgress = false;
    }

    private void ping(String str) {
        EntityId extractEntityIdFromPersistenceId = extractEntityIdFromPersistenceId(str);
        PingCommand of = PingCommand.of(extractEntityIdFromPersistenceId, toCorrelationId(extractEntityIdFromPersistenceId), JsonValue.of(this.pingConfig.getJournalTag()));
        this.log.debug("Sending a 'ping' message for persistenceId <{}>: <{}>", str, of);
        this.persistenceActorShardRegion.tell(of, getSelf());
    }

    private EntityId extractEntityIdFromPersistenceId(String str) {
        int indexOf = str.indexOf(58);
        if (indexOf >= 0) {
            return EntityId.of(EntityType.of(str.substring(0, indexOf)), str.substring(indexOf + 1));
        }
        String format = String.format("Persistence ID <%s> wasn't prefixed with an entity type.", str);
        this.log.error(format);
        throw new IllegalArgumentException(format);
    }

    static String toCorrelationId(EntityId entityId) {
        return "persistence-ping-actor-triggered:" + entityId.toString();
    }

    static Optional<String> toPersistenceId(String str) {
        return str.startsWith(CORRELATION_ID_PREFIX) ? Optional.of(str.replace(CORRELATION_ID_PREFIX, "")) : Optional.empty();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 3441010:
                if (implMethodName.equals("ping")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/persistentactors/PersistencePingActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)V")) {
                    PersistencePingActor persistencePingActor = (PersistencePingActor) serializedLambda.getCapturedArg(0);
                    return persistencePingActor::ping;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
