package org.eclipse.ditto.internal.utils.pekko.actors;

import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.apache.pekko.Done;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.event.DiagnosticLoggingAdapter;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pekko/actors/AbstractActorWithShutdownBehaviorAndRequestCounting.class */
public abstract class AbstractActorWithShutdownBehaviorAndRequestCounting extends AbstractActorWithShutdownBehavior {
    private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private int ongoingRequests = 0;

    @Nullable
    private ActorRef shutdownReceiver = null;

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pekko/actors/AbstractActorWithShutdownBehaviorAndRequestCounting$ControlOp.class */
    public enum ControlOp {
        OP_COMPLETE
    }

    @Override // org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior, org.apache.pekko.actor.AbstractActor
    public final AbstractActor.Receive createReceive() {
        return shutdownBehavior(requestCountingBehavior(handleMessage()));
    }

    @Override // org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior
    public void serviceRequestsDone(AbstractActorWithShutdownBehavior.Control control) {
        if (this.ongoingRequests == 0) {
            this.log.info("{}: no ongoing requests", control);
            getSender().tell(Done.getInstance(), getSelf());
        } else {
            this.log.info("{}: waiting for {} ongoing requests", control, Integer.valueOf(this.ongoingRequests));
            this.shutdownReceiver = getSender();
        }
    }

    @Override // org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior
    protected void become(AbstractActor.Receive receive) {
        getContext().become(shutdownBehavior(requestCountingBehavior(receive)));
    }

    private AbstractActor.Receive requestCountingBehavior(AbstractActor.Receive receive) {
        ConditionChecker.checkNotNull(receive, "actor's message handler");
        return ReceiveBuilder.create().matchEquals(ControlOp.OP_COMPLETE, this::decrementRequestCounter).matchAny(obj -> {
            PartialFunction<Object, BoxedUnit> onMessage = receive.onMessage();
            if (onMessage.isDefinedAt(obj)) {
                onMessage.mo4620apply(obj);
            } else {
                unhandled(obj);
            }
        }).build();
    }

    public void withRequestCounting(CompletionStage<?> completionStage) {
        this.ongoingRequests++;
        completionStage.whenComplete(this::requestComplete);
    }

    private void requestComplete(@Nullable Object obj, Throwable th) {
        this.log.debug("Request completed - send {} to myself", ControlOp.OP_COMPLETE);
        getSelf().tell(ControlOp.OP_COMPLETE, ActorRef.noSender());
    }

    private void decrementRequestCounter(ControlOp controlOp) {
        this.ongoingRequests--;
        if (this.ongoingRequests != 0 || this.shutdownReceiver == null) {
            this.log.debug("{}: waiting for {} request(s) to complete", controlOp, Integer.valueOf(this.ongoingRequests));
        } else {
            this.log.info("{}: finished waiting for requests", controlOp);
            this.shutdownReceiver.tell(Done.done(), getSelf());
        }
    }
}
