package org.eclipse.ditto.services.things.persistence.actors;

import akka.ConfigurationException;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryTimedOut;
import akka.persistence.SnapshotOffer;
import com.typesafe.config.Config;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingBuilder;
import org.eclipse.ditto.model.things.ThingLifecycle;
import org.eclipse.ditto.model.things.ThingsModelFactory;
import org.eclipse.ditto.services.things.persistence.actors.ThingSupervisorActor;
import org.eclipse.ditto.services.things.persistence.actors.strategies.commands.CommandReceiveStrategy;
import org.eclipse.ditto.services.things.persistence.actors.strategies.commands.CommandStrategy;
import org.eclipse.ditto.services.things.persistence.actors.strategies.commands.CreateThingStrategy;
import org.eclipse.ditto.services.things.persistence.actors.strategies.commands.DefaultContext;
import org.eclipse.ditto.services.things.persistence.actors.strategies.events.EventHandleStrategy;
import org.eclipse.ditto.services.things.persistence.snapshotting.DittoThingSnapshotter;
import org.eclipse.ditto.services.things.persistence.snapshotting.ThingSnapshotter;
import org.eclipse.ditto.services.things.persistence.strategies.AbstractReceiveStrategy;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.WithThingId;
import org.eclipse.ditto.signals.base.WithType;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.commands.things.modify.CreateThing;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.eclipse.ditto.signals.events.things.ThingModifiedEvent;
import scala.Option;

/* loaded from: input_file:org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor.class */
public final class ThingPersistenceActor extends AbstractPersistentActor implements ThingPersistenceActorInterface {
    static final String PERSISTENCE_ID_PREFIX = "thing:";
    static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-things-journal";
    static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-things-snapshots";
    private static final CommandReceiveStrategy COMMAND_RECEIVE_STRATEGY = CommandReceiveStrategy.getInstance();
    private static final CreateThingStrategy CREATE_THING_STRATEGY = CreateThingStrategy.getInstance();
    private final String thingId;
    private final ActorRef pubSubMediator;
    private final ThingSnapshotter<?, ?> thingSnapshotter;
    private final Duration activityCheckInterval;
    private final Duration activityCheckDeletedInterval;
    private final AbstractActor.Receive handleThingEvents;
    private final long snapshotThreshold;
    private final CommandStrategy.Context defaultContext;
    private long accessCounter;
    private Cancellable activityChecker;
    private int firstMessageCounter = 0;
    private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private Thing thing = null;

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor$CheckForActivityStrategy.class */
    public final class CheckForActivityStrategy extends AbstractReceiveStrategy<CheckForActivity> {
        CheckForActivityStrategy() {
            super(CheckForActivity.class, ThingPersistenceActor.this.log);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.eclipse.ditto.services.things.persistence.strategies.AbstractReceiveStrategy
        public void doApply(CheckForActivity checkForActivity) {
            if (thingExistsAsDeleted() && !ThingPersistenceActor.this.thingSnapshotter.lastSnapshotCompletedAndUpToDate()) {
                ThingPersistenceActor.this.thingSnapshotter.takeSnapshotInternal();
                ThingPersistenceActor.this.scheduleCheckForThingActivity(ThingPersistenceActor.this.activityCheckDeletedInterval.getSeconds());
            } else if (ThingPersistenceActor.this.accessCounter > checkForActivity.getCurrentAccessCounter()) {
                ThingPersistenceActor.this.scheduleCheckForThingActivity(ThingPersistenceActor.this.activityCheckInterval.getSeconds());
            } else if (ThingPersistenceActor.this.isThingActive()) {
                shutdown("Thing <{}> was not accessed in a while. Shutting Actor down ...", ThingPersistenceActor.this.thingId);
            } else {
                shutdown("Thing <{}> was deleted recently. Shutting Actor down ...", ThingPersistenceActor.this.thingId);
            }
        }

        private boolean thingExistsAsDeleted() {
            return null != ThingPersistenceActor.this.thing && ThingPersistenceActor.this.thing.hasLifecycle(ThingLifecycle.DELETED);
        }

        private void shutdown(String str, String str2) {
            ThingPersistenceActor.this.log.debug(str, str2);
            ThingPersistenceActor.this.stopThisActor();
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor$LogIncomingMessagesConsumer.class */
    private final class LogIncomingMessagesConsumer implements Consumer<Object> {
        private LogIncomingMessagesConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(Object obj) {
            if (obj instanceof WithDittoHeaders) {
                LogUtil.enhanceLogWithCorrelationId(ThingPersistenceActor.this.log, (WithDittoHeaders) obj);
            }
            String messageType = getMessageType(obj);
            if (!isWithThingId(obj)) {
                logInfoAboutIncomingMessage(messageType);
                return;
            }
            String messageThingId = getMessageThingId((WithThingId) obj);
            if (isEqualToActorThingId(messageThingId)) {
                logInfoAboutIncomingMessage(messageType);
            } else {
                ThingPersistenceActor.this.log.warning("<{} got <{}> with different thing ID <{}>!", ThingPersistenceActor.this.thingId, messageType, messageThingId);
            }
        }

        private String getMessageType(Object obj) {
            return isCommand(obj) ? ((WithType) obj).getType() : obj.getClass().getSimpleName();
        }

        private boolean isCommand(Object obj) {
            return obj instanceof Command;
        }

        private boolean isWithThingId(Object obj) {
            return obj instanceof WithThingId;
        }

        private boolean isEqualToActorThingId(String str) {
            return Objects.equals(ThingPersistenceActor.this.thingId, str);
        }

        private String getMessageThingId(WithThingId withThingId) {
            return withThingId.getThingId();
        }

        private void logInfoAboutIncomingMessage(String str) {
            ThingPersistenceActor.this.log.debug("<{}> got <{}>.", ThingPersistenceActor.this.thingId, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor$MatchAnyAfterInitializeStrategy.class */
    public final class MatchAnyAfterInitializeStrategy extends AbstractReceiveStrategy<Object> {
        MatchAnyAfterInitializeStrategy() {
            super(Object.class, ThingPersistenceActor.this.log);
        }

        @Override // org.eclipse.ditto.services.things.persistence.strategies.AbstractReceiveStrategy
        protected void doApply(Object obj) {
            ThingPersistenceActor.this.log.warning("Unknown message: {}", obj);
            ThingPersistenceActor.this.unhandled(obj);
        }
    }

    @NotThreadSafe
    /* loaded from: input_file:org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor$MatchAnyDuringInitializeStrategy.class */
    private final class MatchAnyDuringInitializeStrategy extends AbstractReceiveStrategy<Object> {
        MatchAnyDuringInitializeStrategy() {
            super(Object.class, ThingPersistenceActor.this.log);
        }

        @Override // org.eclipse.ditto.services.things.persistence.strategies.AbstractReceiveStrategy
        protected void doApply(Object obj) {
            ThingPersistenceActor.this.log.debug("Unexpected message after initialization of actor received: {} - Terminating this actor and sending <{}> to requester ...", obj, ThingNotAccessibleException.class.getName());
            ThingNotAccessibleException.Builder newBuilder = ThingNotAccessibleException.newBuilder(ThingPersistenceActor.this.thingId);
            if (obj instanceof WithDittoHeaders) {
                newBuilder.dittoHeaders(((WithDittoHeaders) obj).getDittoHeaders());
            }
            ThingPersistenceActor.this.notifySender(newBuilder.build());
            ThingPersistenceActor.this.scheduleCheckForThingActivity(ThingPersistenceActor.this.activityCheckInterval.getSeconds());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor$PeekConsumer.class */
    public final class PeekConsumer implements Consumer<Object> {
        private final Consumer<Object> furtherConsumers;

        private PeekConsumer(boolean z) {
            this.furtherConsumers = z ? new LogIncomingMessagesConsumer() : obj -> {
            };
        }

        @Override // java.util.function.Consumer
        public void accept(Object obj) {
            ThingPersistenceActor.this.firstMessageCounter = Math.min(2, ThingPersistenceActor.this.firstMessageCounter + 1);
            this.furtherConsumers.accept(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:org/eclipse/ditto/services/things/persistence/actors/ThingPersistenceActor$ThingNotFoundStrategy.class */
    public final class ThingNotFoundStrategy extends AbstractReceiveStrategy<Object> {
        ThingNotFoundStrategy() {
            super(Object.class, ThingPersistenceActor.this.log);
        }

        @Override // org.eclipse.ditto.services.things.persistence.strategies.AbstractReceiveStrategy
        protected void doApply(Object obj) {
            ThingNotAccessibleException.Builder newBuilder = ThingNotAccessibleException.newBuilder(ThingPersistenceActor.this.thingId);
            if (obj instanceof WithDittoHeaders) {
                newBuilder.dittoHeaders(((WithDittoHeaders) obj).getDittoHeaders());
            }
            ThingPersistenceActor.this.notifySender(newBuilder.build());
        }
    }

    ThingPersistenceActor(String str, ActorRef actorRef, ThingSnapshotter.Create create) {
        this.thingId = str;
        this.pubSubMediator = actorRef;
        Config config = getContext().system().settings().config();
        this.activityCheckInterval = config.getDuration("ditto.things.thing.activity.check.interval");
        this.activityCheckDeletedInterval = config.getDuration("ditto.things.thing.activity.check.deleted.interval");
        this.snapshotThreshold = getSnapshotThreshold(config);
        this.thingSnapshotter = getSnapshotter(config, create);
        this.defaultContext = DefaultContext.getInstance(str, this.log, this.thingSnapshotter, this::becomeThingCreatedHandler, this::becomeThingDeletedHandler, this::stopThisActor, this::isFirstMessage);
        this.handleThingEvents = ReceiveBuilder.create().match(ThingEvent.class, thingEvent -> {
            this.thing = EventHandleStrategy.getInstance().handle(thingEvent, this.thing, getRevisionNumber());
        }).build();
    }

    private static long getSnapshotThreshold(Config config) {
        long j = config.getLong("ditto.things.thing.snapshot.threshold");
        if (j < 0) {
            throw new ConfigurationException(String.format("Config setting <%s> must be positive but is <%d>!", "ditto.things.thing.snapshot.threshold", Long.valueOf(j)));
        }
        return j;
    }

    private ThingSnapshotter<?, ?> getSnapshotter(Config config, ThingSnapshotter.Create create) {
        Duration duration = config.getDuration("ditto.things.thing.snapshot.interval");
        return create.apply(this, this.pubSubMediator, config.getBoolean("ditto.things.thing.snapshot.delete-old"), config.getBoolean("ditto.things.thing.events.delete-old"), this.log, duration);
    }

    public static Props props(final String str, final ActorRef actorRef, final ThingSnapshotter.Create create) {
        return Props.create(ThingPersistenceActor.class, new Creator<ThingPersistenceActor>() { // from class: org.eclipse.ditto.services.things.persistence.actors.ThingPersistenceActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ThingPersistenceActor m1create() {
                return new ThingPersistenceActor(str, actorRef, create);
            }
        });
    }

    static Props props(final String str, final ActorRef actorRef) {
        return Props.create(ThingPersistenceActor.class, new Creator<ThingPersistenceActor>() { // from class: org.eclipse.ditto.services.things.persistence.actors.ThingPersistenceActor.2
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ThingPersistenceActor m2create() {
                return new ThingPersistenceActor(str, actorRef, DittoThingSnapshotter::getInstance);
            }
        });
    }

    private static Thing enhanceThingWithLifecycle(Thing thing) {
        ThingBuilder.FromCopy newThingBuilder = ThingsModelFactory.newThingBuilder(thing);
        if (!thing.getLifecycle().isPresent()) {
            newThingBuilder.setLifecycle(ThingLifecycle.ACTIVE);
        }
        return newThingBuilder.build();
    }

    @Override // org.eclipse.ditto.services.things.persistence.actors.ThingPersistenceActorInterface
    @Nonnull
    public Thing getThing() {
        return this.thing;
    }

    @Override // org.eclipse.ditto.services.things.persistence.actors.ThingPersistenceActorInterface
    @Nonnull
    public String getThingId() {
        return this.thingId;
    }

    private boolean isFirstMessage() {
        return this.firstMessageCounter <= 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleCheckForThingActivity(long j) {
        this.log.debug("Scheduling for Activity Check in <{}> seconds.", Long.valueOf(j));
        if (this.activityChecker != null) {
            this.activityChecker.cancel();
        }
        this.activityChecker = getContext().system().scheduler().scheduleOnce(scala.concurrent.duration.Duration.apply(j, TimeUnit.SECONDS), getSelf(), new CheckForActivity(getRevisionNumber(), this.accessCounter), getContext().dispatcher(), (ActorRef) null);
    }

    private long getRevisionNumber() {
        return lastSequenceNr();
    }

    public String persistenceId() {
        return PERSISTENCE_ID_PREFIX + this.thingId;
    }

    public String journalPluginId() {
        return JOURNAL_PLUGIN_ID;
    }

    public String snapshotPluginId() {
        return SNAPSHOT_PLUGIN_ID;
    }

    public void postStop() {
        this.log.debug("Stopping PersistenceActor for Thing with ID - {}", this.thingId);
        super.postStop();
        this.thingSnapshotter.postStop();
        if (this.activityChecker != null) {
            this.activityChecker.cancel();
        }
    }

    public AbstractActor.Receive createReceive() {
        FI.UnitApply unitApply = createThing -> {
            handleCommand(createThing, CREATE_THING_STRATEGY);
        };
        ReceiveBuilder create = ReceiveBuilder.create();
        CreateThingStrategy createThingStrategy = CREATE_THING_STRATEGY;
        createThingStrategy.getClass();
        return new StrategyAwareReceiveBuilder(create.match(CreateThing.class, createThingStrategy::isDefined, unitApply), this.log).match(new CheckForActivityStrategy()).matchAny(new MatchAnyDuringInitializeStrategy()).setPeekConsumer(getPeekConsumer()).build();
    }

    @Nullable
    private Consumer<Object> getPeekConsumer() {
        return new PeekConsumer(isLogIncomingMessages());
    }

    public void onRecoveryFailure(Throwable th, Option<Object> option) {
        super.onRecoveryFailure(th, option);
        this.log.error("Recovery Failure for Thing with ID {} and cause {}", this.thingId, th.getMessage());
    }

    public AbstractActor.Receive createReceiveRecover() {
        return this.handleThingEvents.orElse(ReceiveBuilder.create().match(SnapshotOffer.class, snapshotOffer -> {
            this.log.debug("Got SnapshotOffer: {}", snapshotOffer);
            this.thing = this.thingSnapshotter.recoverThingFromSnapshotOffer(snapshotOffer);
        }).match(RecoveryTimedOut.class, recoveryTimedOut -> {
            this.log.warning("RecoveryTimeout occurred during recovery for Thing with ID {}", this.thingId);
        }).match(RecoveryCompleted.class, recoveryCompleted -> {
            if (this.thing != null) {
                this.thing = enhanceThingWithLifecycle(this.thing);
                this.log.debug("Thing <{}> was recovered.", this.thingId);
                if (isThingActive()) {
                    becomeThingCreatedHandler();
                    return;
                }
                if (!isThingDeleted()) {
                    this.log.error("Unknown lifecycle state <{}> for Thing <{}>.", this.thing.getLifecycle(), this.thingId);
                }
                becomeThingDeletedHandler();
            }
        }).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build());
    }

    private void becomeThingCreatedHandler() {
        FI.UnitApply unitApply = command -> {
            handleCommand(command, COMMAND_RECEIVE_STRATEGY);
        };
        ReceiveBuilder create = ReceiveBuilder.create();
        CommandReceiveStrategy commandReceiveStrategy = COMMAND_RECEIVE_STRATEGY;
        commandReceiveStrategy.getClass();
        getContext().become(new StrategyAwareReceiveBuilder(create.match(Command.class, commandReceiveStrategy::isDefined, unitApply), this.log).matchEach(this.thingSnapshotter.strategies()).match(new CheckForActivityStrategy()).matchAny(new MatchAnyAfterInitializeStrategy()).build(), true);
        getContext().getParent().tell(ThingSupervisorActor.ManualReset.INSTANCE, getSelf());
        scheduleCheckForThingActivity(this.activityCheckInterval.getSeconds());
        this.thingSnapshotter.startMaintenanceSnapshots();
    }

    private void handleCommand(Command command, CommandStrategy commandStrategy) {
        try {
            CommandStrategy.Result apply = commandStrategy.apply(this.defaultContext, this.thing, getNextRevisionNumber(), command);
            apply.apply(this.defaultContext, this::persistAndApplyEvent, asyncNotifySender());
        } catch (DittoRuntimeException e) {
            getSender().tell(e, getSelf());
        }
    }

    private long getNextRevisionNumber() {
        return getRevisionNumber() + 1;
    }

    private void becomeThingDeletedHandler() {
        FI.UnitApply unitApply = createThing -> {
            handleCommand(createThing, CREATE_THING_STRATEGY);
        };
        ReceiveBuilder create = ReceiveBuilder.create();
        CreateThingStrategy createThingStrategy = CREATE_THING_STRATEGY;
        createThingStrategy.getClass();
        getContext().become(new StrategyAwareReceiveBuilder(create.match(CreateThing.class, createThingStrategy::isDefined, unitApply), this.log).matchEach(this.thingSnapshotter.strategies()).match(new CheckForActivityStrategy()).matchAny(new ThingNotFoundStrategy()).setPeekConsumer(getPeekConsumer()).build(), true);
        getContext().getParent().tell(ThingSupervisorActor.ManualReset.INSTANCE, getSelf());
        scheduleCheckForThingActivity(this.activityCheckDeletedInterval.getSeconds());
        this.thingSnapshotter.stopMaintenanceSnapshots();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [org.eclipse.ditto.signals.events.things.ThingModifiedEvent] */
    private <A extends ThingModifiedEvent<? extends A>> void persistAndApplyEvent(A a, BiConsumer<A, Thing> biConsumer) {
        A dittoHeaders = this.thing != null ? a.setDittoHeaders(a.getDittoHeaders().toBuilder().schemaVersion(this.thing.getImplementedSchemaVersion()).build()) : a;
        if (dittoHeaders.getDittoHeaders().isDryRun()) {
            biConsumer.accept(dittoHeaders, this.thing);
        } else {
            persistEvent(dittoHeaders, thingModifiedEvent -> {
                applyEvent(thingModifiedEvent);
                biConsumer.accept(thingModifiedEvent, this.thing);
            });
        }
    }

    private <A extends ThingModifiedEvent> void persistEvent(A a, Consumer<A> consumer) {
        LogUtil.enhanceLogWithCorrelationId(this.log, a.getDittoHeaders().getCorrelationId());
        this.log.debug("Persisting Event <{}>.", a.getType());
        persist(a, thingModifiedEvent -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, a.getDittoHeaders().getCorrelationId());
            this.log.info("Successfully persisted Event <{}>.", a.getType());
            consumer.accept(thingModifiedEvent);
            if (snapshotThresholdPassed()) {
                this.thingSnapshotter.takeSnapshotInternal();
            }
        });
    }

    private boolean snapshotThresholdPassed() {
        return this.thingSnapshotter.getLatestSnapshotSequenceNr() > 0 ? getRevisionNumber() - this.thingSnapshotter.getLatestSnapshotSequenceNr() > this.snapshotThreshold : getRevisionNumber() + 1 > this.snapshotThreshold;
    }

    private <A extends ThingModifiedEvent> void applyEvent(A a) {
        this.handleThingEvents.onMessage().apply(a);
        notifySubscribers(a);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isThingActive() {
        return null != this.thing && this.thing.hasLifecycle(ThingLifecycle.ACTIVE);
    }

    @Override // org.eclipse.ditto.services.things.persistence.actors.ThingPersistenceActorInterface
    public boolean isThingDeleted() {
        return null == this.thing || this.thing.hasLifecycle(ThingLifecycle.DELETED);
    }

    private void notifySubscribers(ThingEvent thingEvent) {
        this.pubSubMediator.tell(new DistributedPubSubMediator.Publish("things.events:", thingEvent, true), getSelf());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifySender(WithDittoHeaders withDittoHeaders) {
        notifySender(getSender(), withDittoHeaders);
    }

    private Consumer<WithDittoHeaders> asyncNotifySender() {
        this.accessCounter++;
        ActorRef sender = getSender();
        ActorRef self = getSelf();
        return withDittoHeaders -> {
            sender.tell(withDittoHeaders, self);
        };
    }

    private void notifySender(ActorRef actorRef, WithDittoHeaders withDittoHeaders) {
        this.accessCounter++;
        actorRef.tell(withDittoHeaders, getSelf());
    }

    private boolean isLogIncomingMessages() {
        Config config = getContext().getSystem().settings().config();
        return config.hasPath("ditto.things.log-incoming-messages") && config.getBoolean("ditto.things.log-incoming-messages");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopThisActor() {
        getContext().getParent().tell(ThingSupervisorActor.Control.PASSIVATE, getSelf());
    }
}
