package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import java.util.Optional;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/streaming/actors/StreamingActor.class */
public final class StreamingActor extends AbstractActor {
    public static final String ACTOR_NAME = "streaming";
    private final DiagnosticLoggingAdapter logger;
    private final ActorRef pubSubMediator;
    private final ActorRef commandRouter;
    private final SupervisorStrategy strategy;

    private StreamingActor(ActorRef actorRef, ActorRef actorRef2) {
        this.logger = LogUtil.obtain(this);
        this.strategy = new OneForOneStrategy(true, DeciderBuilder.match(Throwable.class, th -> {
            this.logger.error(th, "Escalating above actor!");
            return SupervisorStrategy.escalate();
        }).matchAny(th2 -> {
            this.logger.error("Unknown message:'{}'! Escalating above actor!", th2);
            return SupervisorStrategy.escalate();
        }).build());
        this.pubSubMediator = actorRef;
        this.commandRouter = actorRef2;
    }

    public static Props props(final ActorRef actorRef, final ActorRef actorRef2) {
        return Props.create(StreamingActor.class, new Creator<StreamingActor>() { // from class: org.eclipse.ditto.services.gateway.streaming.actors.StreamingActor.1
            private static final long serialVersionUID = 1;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // akka.japi.Creator
            public StreamingActor create() {
                return new StreamingActor(ActorRef.this, actorRef2);
            }
        });
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        return this.strategy;
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            ActorRef eventAndResponsePublisher = connect.getEventAndResponsePublisher();
            eventAndResponsePublisher.forward(connect, getContext());
            String connectionCorrelationId = connect.getConnectionCorrelationId();
            getContext().actorOf(StreamingSessionActor.props(connectionCorrelationId, connect.getType(), this.pubSubMediator, eventAndResponsePublisher), connectionCorrelationId);
        }).match(StartStreaming.class, startStreaming -> {
            forwardToSessionActor(startStreaming.getConnectionCorrelationId(), startStreaming);
        }).match(StopStreaming.class, stopStreaming -> {
            forwardToSessionActor(stopStreaming.getConnectionCorrelationId(), stopStreaming);
        }).match(Signal.class, signal -> {
            Optional<String> origin = signal.getDittoHeaders().getOrigin();
            if (!origin.isPresent()) {
                this.logger.warning("Signal is missing the required origin header: {}", signal.getDittoHeaders().getCorrelationId());
                return;
            }
            ActorRef child = getContext().getChild(origin.get());
            if (child != null) {
                this.commandRouter.tell(signal, child);
            }
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            Optional<String> origin = dittoRuntimeException.getDittoHeaders().getOrigin();
            if (origin.isPresent()) {
                forwardToSessionActor(origin.get(), dittoRuntimeException);
            } else {
                this.logger.warning("Unhandled DittoRuntimeException: <{}: {}>", dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage());
            }
        }).matchAny(obj -> {
            this.logger.warning("Got unknown message: '{}'", obj);
        }).build();
    }

    private void forwardToSessionActor(String str, Object obj) {
        if (obj instanceof WithDittoHeaders) {
            LogUtil.enhanceLogWithCorrelationId(this.logger, (WithDittoHeaders<?>) obj);
        } else {
            LogUtil.enhanceLogWithCorrelationId(this.logger, (String) null);
        }
        this.logger.debug("Forwarding to session actor '{}': {}", str, obj);
        getContext().actorSelection(str).forward(obj, getContext());
    }
}
