package org.eclipse.ditto.services.utils.akka.controlflow;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializerSettings;
import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.OverflowStrategy;
import akka.stream.QueueOfferResult;
import akka.stream.Supervision;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Merge;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
import ch.qos.logback.core.joran.action.Action;
import java.lang.invoke.SerializedLambda;
import java.util.Optional;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;

/* loaded from: input_file:org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor.class */
public abstract class AbstractGraphActor<T> extends AbstractActor {
    public static final String DITTO_INTERNAL_SPECIAL_ENFORCEMENT_LANE = "ditto-internal-special-enforcement-lane";
    protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private final Counter receiveCounter = DittoMetrics.counter("graph_actor_receive").tag(Action.CLASS_ATTRIBUTE, getClass().getSimpleName());
    private final Counter enqueueSuccessCounter = DittoMetrics.counter("graph_actor_enqueue_success").tag(Action.CLASS_ATTRIBUTE, getClass().getSimpleName());
    private final Counter enqueueDroppedCounter = DittoMetrics.counter("graph_actor_enqueue_dropped").tag(Action.CLASS_ATTRIBUTE, getClass().getSimpleName());
    private final Counter enqueueFailureCounter = DittoMetrics.counter("graph_actor_enqueue_failure").tag(Action.CLASS_ATTRIBUTE, getClass().getSimpleName());
    private final Counter dequeueCounter = DittoMetrics.counter("graph_actor_dequeue").tag(Action.CLASS_ATTRIBUTE, getClass().getSimpleName());

    protected AbstractGraphActor() {
    }

    protected abstract T mapMessage(WithDittoHeaders withDittoHeaders);

    protected T beforeProcessMessage(T t) {
        return t;
    }

    protected abstract Flow<T, T, NotUsed> processMessageFlow();

    protected abstract Sink<T, ?> processedMessageSink();

    protected abstract int getBufferSize();

    protected abstract int getParallelism();

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        String simpleName = getClass().getSimpleName();
        ActorMaterializer create = ActorMaterializer.create(ActorMaterializerSettings.create(getContext().getSystem()).withSupervisionStrategy(th -> {
            if (th instanceof DittoRuntimeException) {
                LogUtil.enhanceLogWithCorrelationId(this.log, (DittoRuntimeException) th, new LogUtil.MdcField[0]);
                this.log.warning("DittoRuntimeException in stream of {}: [{}] {}", simpleName, th.getClass().getSimpleName(), th.getMessage());
            } else {
                this.log.error(th, "Exception in stream of {}: {}", simpleName, th.getMessage());
            }
            return Supervision.resume();
        }), getContext());
        Attributes logLevels = Attributes.logLevels(Attributes.logLevelDebug(), Attributes.logLevelError(), Attributes.logLevelError());
        SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) Source.queue(getBufferSize(), OverflowStrategy.dropNew()).map(this::incrementDequeueCounter).log("graph-actor-stream-1-dequeued", this.log).mo3225withAttributes(logLevels).via(Flow.fromFunction(this::beforeProcessMessage)).log("graph-actor-stream-2-preprocessed", this.log).mo3225withAttributes(logLevels).via(partitionById(processMessageFlow(), getParallelism())).log("graph-actor-stream-3-partitioned", this.log).mo3225withAttributes(logLevels).to(processedMessageSink()).run(create);
        ReceiveBuilder create2 = ReceiveBuilder.create();
        preEnhancement(create2);
        return create2.match(DittoRuntimeException.class, dittoRuntimeException -> {
            this.log.debug("Received DittoRuntimeException: <{}>", dittoRuntimeException);
            getSender().tell(dittoRuntimeException, getSelf());
        }).match(WithDittoHeaders.class, withDittoHeaders -> {
            LogUtil.enhanceLogWithCorrelationId(this.log, (WithDittoHeaders<?>) withDittoHeaders, new LogUtil.MdcField[0]);
            if (withDittoHeaders instanceof WithId) {
                this.log.debug("Received <{}> with id <{}>", withDittoHeaders.getClass().getSimpleName(), ((WithId) withDittoHeaders).getEntityId());
            } else {
                this.log.debug("Received WithDittoHeaders: <{}>", withDittoHeaders);
            }
            incrementReceiveCounter();
            sourceQueueWithComplete.offer(mapMessage(withDittoHeaders)).handle(this::incrementEnqueueCounters);
        }).match(Throwable.class, th2 -> {
            this.log.warning("Received unknown Throwable: <{}>", th2);
            getSender().tell(GatewayInternalErrorException.newBuilder().cause2(th2).build(), getSelf());
        }).matchAny(obj -> {
            this.log.warning("Received unknown message: <{}>", obj);
        }).build();
    }

    private void incrementReceiveCounter() {
        this.receiveCounter.increment();
    }

    private Void incrementEnqueueCounters(QueueOfferResult queueOfferResult, Throwable th) {
        if (QueueOfferResult.enqueued().equals(queueOfferResult)) {
            this.enqueueSuccessCounter.increment();
            return null;
        }
        if (QueueOfferResult.dropped().equals(queueOfferResult)) {
            this.enqueueDroppedCounter.increment();
            return null;
        }
        if (queueOfferResult instanceof QueueOfferResult.Failure) {
            this.log.error(((QueueOfferResult.Failure) queueOfferResult).cause(), "enqueue failed");
            this.enqueueFailureCounter.increment();
            return null;
        }
        this.log.error(th, "enqueue failed without acknowledgement");
        this.enqueueFailureCounter.increment();
        return null;
    }

    private <E> E incrementDequeueCounter(E e) {
        this.dequeueCounter.increment();
        return e;
    }

    private static <T> Flow<T, T, NotUsed> partitionById(Flow<T, T, NotUsed> flow, int i) {
        int i2 = i + 1;
        return Flow.fromGraph(GraphDSL.create(Partition.create(i2, obj -> {
            if (!checkForSpecialLane(obj) && (obj instanceof WithId)) {
                EntityId entityId = ((WithId) obj).getEntityId();
                return entityId.isDummy() ? Integer.valueOf((obj.hashCode() % i) + 1) : Integer.valueOf(Math.abs(entityId.hashCode() % i) + 1);
            }
            return 0;
        }), Merge.create(i2, true), (notUsed, notUsed2) -> {
            return notUsed;
        }, (builder, uniformFanOutShape, uniformFanInShape) -> {
            for (int i3 = 0; i3 < i2; i3++) {
                builder.from(uniformFanOutShape.out(i3)).via((FlowShape) builder.add(flow)).toInlet(uniformFanInShape.in(i3));
            }
            return FlowShape.of(uniformFanOutShape.in(), uniformFanInShape.out());
        }));
    }

    private static <T> boolean checkForSpecialLane(T t) {
        return (t instanceof WithDittoHeaders) && Optional.ofNullable(((WithDittoHeaders) t).getDittoHeaders().get(DITTO_INTERNAL_SPECIAL_ENFORCEMENT_LANE)).isPresent();
    }

    protected abstract void preEnhancement(ReceiveBuilder receiveBuilder);

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1477181603:
                if (implMethodName.equals("lambda$partitionById$c467d849$1")) {
                    z = 5;
                    break;
                }
                break;
            case -714331141:
                if (implMethodName.equals("incrementDequeueCounter")) {
                    z = 2;
                    break;
                }
                break;
            case -315956008:
                if (implMethodName.equals("lambda$createReceive$1b373186$1")) {
                    z = false;
                    break;
                }
                break;
            case -90460169:
                if (implMethodName.equals("beforeProcessMessage")) {
                    z = 3;
                    break;
                }
                break;
            case 815718728:
                if (implMethodName.equals("lambda$partitionById$6abb348b$1")) {
                    z = true;
                    break;
                }
                break;
            case 1401389291:
                if (implMethodName.equals("lambda$partitionById$66371fcc$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Throwable;)Lakka/stream/Supervision$Directive;")) {
                    AbstractGraphActor abstractGraphActor = (AbstractGraphActor) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    return th -> {
                        if (th instanceof DittoRuntimeException) {
                            LogUtil.enhanceLogWithCorrelationId(this.log, (DittoRuntimeException) th, new LogUtil.MdcField[0]);
                            this.log.warning("DittoRuntimeException in stream of {}: [{}] {}", str, th.getClass().getSimpleName(), th.getMessage());
                        } else {
                            this.log.error(th, "Exception in stream of {}: {}", str, th.getMessage());
                        }
                        return Supervision.resume();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function3") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(ILakka/stream/javadsl/Flow;Lakka/stream/javadsl/GraphDSL$Builder;Lakka/stream/UniformFanOutShape;Lakka/stream/UniformFanInShape;)Lakka/stream/FlowShape;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    Flow flow = (Flow) serializedLambda.getCapturedArg(1);
                    return (builder, uniformFanOutShape, uniformFanInShape) -> {
                        for (int i3 = 0; i3 < intValue; i3++) {
                            builder.from(uniformFanOutShape.out(i3)).via((FlowShape) builder.add(flow)).toInlet(uniformFanInShape.in(i3));
                        }
                        return FlowShape.of(uniformFanOutShape.in(), uniformFanInShape.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    AbstractGraphActor abstractGraphActor2 = (AbstractGraphActor) serializedLambda.getCapturedArg(0);
                    return abstractGraphActor2::incrementDequeueCounter;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    AbstractGraphActor abstractGraphActor3 = (AbstractGraphActor) serializedLambda.getCapturedArg(0);
                    return abstractGraphActor3::beforeProcessMessage;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(Lakka/NotUsed;Lakka/NotUsed;)Lakka/NotUsed;")) {
                    return (notUsed, notUsed2) -> {
                        return notUsed;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/akka/controlflow/AbstractGraphActor") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/Object;)Ljava/lang/Integer;")) {
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return obj -> {
                        if (!checkForSpecialLane(obj) && (obj instanceof WithId)) {
                            EntityId entityId = ((WithId) obj).getEntityId();
                            return entityId.isDummy() ? Integer.valueOf((obj.hashCode() % intValue2) + 1) : Integer.valueOf(Math.abs(entityId.hashCode() % intValue2) + 1);
                        }
                        return 0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
