package org.eclipse.ditto.services.utils.akka.controlflow;

import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorAttributes;
import akka.stream.Attributes;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.QueueOfferResult;
import akka.stream.Supervision;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueue;
import akka.stream.javadsl.SourceQueueWithComplete;
import ch.qos.logback.core.joran.action.Action;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.Map;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;

/* loaded from: input_file:org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor.class */
public abstract class AbstractGraphActor<T, M> extends AbstractActor {
    protected final ThreadSafeDittoLoggingAdapter logger;
    protected final Materializer materializer;
    private final Class<M> matchClass;
    private final Counter receiveCounter;
    private final Counter enqueueSuccessCounter;
    private final Counter enqueueDroppedCounter;
    private final Counter enqueueFailureCounter;
    private final Counter dequeueCounter;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractGraphActor(Class<M> cls) {
        this.matchClass = (Class) ConditionChecker.checkNotNull(cls, "matchClass");
        Map singletonMap = Collections.singletonMap(Action.CLASS_ATTRIBUTE, getClass().getSimpleName());
        this.receiveCounter = DittoMetrics.counter("graph_actor_receive", singletonMap);
        this.enqueueSuccessCounter = DittoMetrics.counter("graph_actor_enqueue_success", singletonMap);
        this.enqueueDroppedCounter = DittoMetrics.counter("graph_actor_enqueue_dropped", singletonMap);
        this.enqueueFailureCounter = DittoMetrics.counter("graph_actor_enqueue_failure", singletonMap);
        this.dequeueCounter = DittoMetrics.counter("graph_actor_dequeue", singletonMap);
        this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
        this.materializer = Materializer.createMaterializer(this::getContext);
    }

    protected abstract T mapMessage(M m);

    protected T beforeProcessMessage(T t) {
        return t;
    }

    protected abstract Sink<T, ?> createSink();

    protected abstract int getBufferSize();

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        SourceQueueWithComplete<T> sourceQueue = getSourceQueue(this.materializer);
        ReceiveBuilder create = ReceiveBuilder.create();
        preEnhancement(create);
        return create.match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(this.matchClass, obj -> {
            handleMatched(sourceQueue, obj);
        }).match(Throwable.class, this::handleUnknownThrowable).matchAny(obj2 -> {
            this.logger.warning("Received unknown message <{}>.", obj2);
        }).build();
    }

    private Attributes getSupervisionStrategyAttribute() {
        String simpleName = getClass().getSimpleName();
        return ActorAttributes.withSupervisionStrategy(th -> {
            if (th instanceof DittoRuntimeException) {
                this.logger.withCorrelationId((WithDittoHeaders<?>) th).warning("DittoRuntimeException in stream of {}: [{}] {}", simpleName, th.getClass().getSimpleName(), th.getMessage());
            } else {
                this.logger.error(th, "Exception in stream of {}: {}", simpleName, th.getMessage());
            }
            return Supervision.resume();
        });
    }

    private SourceQueueWithComplete<T> getSourceQueue(Materializer materializer) {
        return (SourceQueueWithComplete) Source.queue(getBufferSize(), OverflowStrategy.dropNew()).map(this::incrementDequeueCounter).log("graph-actor-stream-1-dequeued", this.logger).via(Flow.fromFunction(this::beforeProcessMessage)).log("graph-actor-stream-2-preprocessed", this.logger).to(createSink()).mo3523withAttributes(Attributes.logLevels(Attributes.logLevelDebug(), Attributes.logLevelError(), Attributes.logLevelError()).and(getSupervisionStrategyAttribute())).run(materializer);
    }

    private <E> E incrementDequeueCounter(E e) {
        this.dequeueCounter.increment();
        return e;
    }

    protected void preEnhancement(ReceiveBuilder receiveBuilder) {
    }

    protected void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        this.logger.withCorrelationId(dittoRuntimeException).debug("Received <{}>.", dittoRuntimeException);
        getSender().tell(dittoRuntimeException, getSelf());
    }

    private void handleMatched(SourceQueue<T> sourceQueue, M m) {
        ThreadSafeDittoLoggingAdapter withCorrelationId = m instanceof WithDittoHeaders ? this.logger.withCorrelationId((WithDittoHeaders<?>) m) : this.logger;
        if (m instanceof WithId) {
            withCorrelationId.debug("Received <{}> with ID <{}>.", m.getClass().getSimpleName(), ((WithId) m).getEntityId());
        } else {
            withCorrelationId.debug("Received match: <{}>.", m);
        }
        this.receiveCounter.increment();
        sourceQueue.offer(mapMessage(m)).handle(this::incrementEnqueueCounters);
    }

    private Void incrementEnqueueCounters(QueueOfferResult queueOfferResult, Throwable th) {
        if (QueueOfferResult.enqueued().equals(queueOfferResult)) {
            this.enqueueSuccessCounter.increment();
            return null;
        }
        if (QueueOfferResult.dropped().equals(queueOfferResult)) {
            this.enqueueDroppedCounter.increment();
            return null;
        }
        if (queueOfferResult instanceof QueueOfferResult.Failure) {
            this.logger.error(((QueueOfferResult.Failure) queueOfferResult).cause(), "Enqueue failed!");
            this.enqueueFailureCounter.increment();
            return null;
        }
        this.logger.error(th, "Enqueue failed without acknowledgement!");
        this.enqueueFailureCounter.increment();
        return null;
    }

    private void handleUnknownThrowable(Throwable th) {
        this.logger.warning("Received unknown Throwable <{}>!", th);
        getSender().tell(GatewayInternalErrorException.newBuilder().cause2(th).build(), getSelf());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1536701490:
                if (implMethodName.equals("lambda$getSupervisionStrategyAttribute$693d9f83$1")) {
                    z = true;
                    break;
                }
                break;
            case -714331141:
                if (implMethodName.equals("incrementDequeueCounter")) {
                    z = false;
                    break;
                }
                break;
            case -90460169:
                if (implMethodName.equals("beforeProcessMessage")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    AbstractGraphActor abstractGraphActor = (AbstractGraphActor) serializedLambda.getCapturedArg(0);
                    return abstractGraphActor::incrementDequeueCounter;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Throwable;)Lakka/stream/Supervision$Directive;")) {
                    AbstractGraphActor abstractGraphActor2 = (AbstractGraphActor) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    return th -> {
                        if (th instanceof DittoRuntimeException) {
                            this.logger.withCorrelationId((WithDittoHeaders<?>) th).warning("DittoRuntimeException in stream of {}: [{}] {}", str, th.getClass().getSimpleName(), th.getMessage());
                        } else {
                            this.logger.error(th, "Exception in stream of {}: {}", str, th.getMessage());
                        }
                        return Supervision.resume();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    AbstractGraphActor abstractGraphActor3 = (AbstractGraphActor) serializedLambda.getCapturedArg(0);
                    return abstractGraphActor3::beforeProcessMessage;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
