package org.eclipse.ditto.edge.service.streaming;

import java.time.Duration;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.PoisonPill;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.exceptions.StreamingSubscriptionProtocolErrorException;
import org.eclipse.ditto.base.model.signals.commands.exceptions.StreamingSubscriptionTimeoutException;
import org.eclipse.ditto.base.model.signals.commands.streaming.CancelStreamingSubscription;
import org.eclipse.ditto.base.model.signals.commands.streaming.RequestFromStreamingSubscription;
import org.eclipse.ditto.base.model.signals.commands.streaming.StreamingSubscriptionCommand;
import org.eclipse.ditto.base.model.signals.events.streaming.StreamingSubscriptionComplete;
import org.eclipse.ditto.base.model.signals.events.streaming.StreamingSubscriptionCreated;
import org.eclipse.ditto.base.model.signals.events.streaming.StreamingSubscriptionFailed;
import org.eclipse.ditto.base.model.signals.events.streaming.StreamingSubscriptionHasNext;
import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithStashWithTimers;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonValue;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/eclipse/ditto/edge/service/streaming/StreamingSubscriptionActor.class */
public final class StreamingSubscriptionActor extends AbstractActorWithStashWithTimers {
    private static final Duration ZOMBIE_LIFETIME = Duration.ofSeconds(10);
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final EntityId entityId;
    private Subscription subscription;
    private ActorRef sender;
    private DittoHeaders dittoHeaders;

    /* loaded from: input_file:org/eclipse/ditto/edge/service/streaming/StreamingSubscriptionActor$StreamingSubscriberOps.class */
    private static final class StreamingSubscriberOps implements Subscriber<JsonValue> {
        private final ActorRef streamingSubscriptionActor;
        private final String subscriptionId;
        private final EntityId entityId;

        private StreamingSubscriberOps(ActorRef actorRef, EntityId entityId) {
            this.streamingSubscriptionActor = actorRef;
            this.subscriptionId = actorRef.path().name();
            this.entityId = entityId;
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.streamingSubscriptionActor.tell(subscription, ActorRef.noSender());
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(JsonValue jsonValue) {
            this.streamingSubscriptionActor.tell(StreamingSubscriptionHasNext.of(this.subscriptionId, this.entityId, jsonValue, DittoHeaders.empty()), ActorRef.noSender());
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.streamingSubscriptionActor.tell(StreamingSubscriptionFailed.of(this.subscriptionId, this.entityId, DittoRuntimeException.asDittoRuntimeException(th, th2 -> {
                return th2 instanceof IllegalArgumentException ? StreamingSubscriptionProtocolErrorException.of(th2, DittoHeaders.empty()) : DittoInternalErrorException.newBuilder().cause2(th2).build();
            }), DittoHeaders.empty()), ActorRef.noSender());
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.streamingSubscriptionActor.tell(StreamingSubscriptionComplete.of(this.subscriptionId, this.entityId, DittoHeaders.empty()), ActorRef.noSender());
        }
    }

    StreamingSubscriptionActor(Duration duration, EntityId entityId, ActorRef actorRef, DittoHeaders dittoHeaders) {
        this.entityId = entityId;
        this.sender = actorRef;
        this.dittoHeaders = dittoHeaders;
        getContext().setReceiveTimeout(duration);
    }

    public static Props props(Duration duration, EntityId entityId, ActorRef actorRef, DittoHeaders dittoHeaders) {
        return Props.create((Class<?>) StreamingSubscriptionActor.class, duration, entityId, actorRef, dittoHeaders);
    }

    public static Subscriber<JsonValue> asSubscriber(ActorRef actorRef, EntityId entityId) {
        return new StreamingSubscriberOps(actorRef, entityId);
    }

    @Override // org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithStashWithTimers, org.apache.pekko.actor.AbstractActor, org.apache.pekko.actor.Actor
    public void postStop() {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    @Override // org.apache.pekko.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RequestFromStreamingSubscription.class, this::requestSubscription).match(CancelStreamingSubscription.class, this::cancelSubscription).match(StreamingSubscriptionHasNext.class, this::subscriptionHasNext).match(StreamingSubscriptionComplete.class, this::subscriptionComplete).match(StreamingSubscriptionFailed.class, this::subscriptionFailed).match(Subscription.class, this::onSubscribe).matchEquals(ReceiveTimeout.getInstance(), (v1) -> {
            idleTimeout(v1);
        }).matchAny(obj -> {
            this.log.warning("Unknown message: <{}>", obj);
        }).build();
    }

    private AbstractActor.Receive createZombieBehavior() {
        return ReceiveBuilder.create().match(RequestFromStreamingSubscription.class, requestFromStreamingSubscription -> {
            this.log.withCorrelationId(requestFromStreamingSubscription).info("Rejecting RequestSubscription[demand={}] as zombie", Long.valueOf(requestFromStreamingSubscription.getDemand()));
            getSender().tell(StreamingSubscriptionFailed.of(getSubscriptionId(), this.entityId, StreamingSubscriptionProtocolErrorException.newBuilder().message("This subscription is considered cancelled. No more messages are processed.").build(), requestFromStreamingSubscription.getDittoHeaders()), ActorRef.noSender());
        }).matchAny(obj -> {
            this.log.debug("Ignoring as zombie: <{}>", obj);
        }).build();
    }

    private void idleTimeout(ReceiveTimeout receiveTimeout) {
        this.log.info("Stopping due to idle timeout");
        getContext().cancelReceiveTimeout();
        String subscriptionId = getSubscriptionId();
        StreamingSubscriptionFailed of = StreamingSubscriptionFailed.of(subscriptionId, this.entityId, StreamingSubscriptionTimeoutException.of(subscriptionId, this.dittoHeaders), this.dittoHeaders);
        if (this.subscription == null) {
            this.sender.tell(getSubscriptionCreated(), ActorRef.noSender());
        }
        this.sender.tell(of, ActorRef.noSender());
        becomeZombie();
    }

    private void onSubscribe(Subscription subscription) {
        if (this.subscription != null) {
            subscription.cancel();
            return;
        }
        this.subscription = subscription;
        this.sender.tell(getSubscriptionCreated(), ActorRef.noSender());
        unstashAll();
    }

    private StreamingSubscriptionCreated getSubscriptionCreated() {
        return StreamingSubscriptionCreated.of(getSubscriptionId(), this.entityId, this.dittoHeaders);
    }

    private void setSenderAndDittoHeaders(StreamingSubscriptionCommand<?> streamingSubscriptionCommand) {
        this.sender = getSender();
        this.dittoHeaders = streamingSubscriptionCommand.getDittoHeaders();
    }

    private void requestSubscription(RequestFromStreamingSubscription requestFromStreamingSubscription) {
        if (this.subscription == null) {
            this.log.withCorrelationId(requestFromStreamingSubscription).debug("Stashing <{}>", requestFromStreamingSubscription);
            stash();
        } else {
            this.log.withCorrelationId(requestFromStreamingSubscription).debug("Processing <{}>", requestFromStreamingSubscription);
            setSenderAndDittoHeaders(requestFromStreamingSubscription);
            this.subscription.request(requestFromStreamingSubscription.getDemand());
        }
    }

    private void cancelSubscription(CancelStreamingSubscription cancelStreamingSubscription) {
        if (this.subscription == null) {
            this.log.withCorrelationId(cancelStreamingSubscription).info("Stashing <{}>", cancelStreamingSubscription);
            stash();
        } else {
            this.log.withCorrelationId(cancelStreamingSubscription).info("Processing <{}>", cancelStreamingSubscription);
            setSenderAndDittoHeaders(cancelStreamingSubscription);
            this.subscription.cancel();
            becomeZombie();
        }
    }

    private void subscriptionHasNext(StreamingSubscriptionHasNext streamingSubscriptionHasNext) {
        this.log.debug("Forwarding {}", streamingSubscriptionHasNext);
        this.sender.tell(streamingSubscriptionHasNext.setDittoHeaders(this.dittoHeaders), ActorRef.noSender());
    }

    private void subscriptionComplete(StreamingSubscriptionComplete streamingSubscriptionComplete) {
        if (this.subscription == null) {
            this.log.withCorrelationId(streamingSubscriptionComplete).debug("Stashing <{}>", streamingSubscriptionComplete);
            stash();
        } else {
            this.log.info("{}", streamingSubscriptionComplete);
            this.sender.tell(streamingSubscriptionComplete.setDittoHeaders(this.dittoHeaders), ActorRef.noSender());
            becomeZombie();
        }
    }

    private void subscriptionFailed(StreamingSubscriptionFailed streamingSubscriptionFailed) {
        if (this.subscription == null) {
            this.log.withCorrelationId(streamingSubscriptionFailed).debug("Stashing <{}>", streamingSubscriptionFailed);
            stash();
        } else {
            this.log.withCorrelationId(streamingSubscriptionFailed).info("{}", streamingSubscriptionFailed);
            this.sender.tell(streamingSubscriptionFailed.setDittoHeaders(this.dittoHeaders), ActorRef.noSender());
            becomeZombie();
        }
    }

    private void becomeZombie() {
        getTimers().startSingleTimer(PoisonPill.getInstance(), PoisonPill.getInstance(), ZOMBIE_LIFETIME);
        getContext().become(createZombieBehavior());
    }

    private String getSubscriptionId() {
        return getSelf().path().name();
    }
}
