package org.eclipse.ditto.concierge.service.actors.cleanup.credits;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskTimeoutException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/concierge/service/actors/cleanup/credits/MessageAggregator.class */
final class MessageAggregator<T> extends AbstractActorWithTimers {
    static final Object TIMEOUT = new AskTimeoutException("MessageAggregator.TIMEOUT");
    private final ActorRef initialReceiver;
    private final Class<T> messageClass;
    private final int expectedMessages;
    private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

    @Nullable
    private ActorRef sender = null;
    private List<T> messages = new ArrayList();

    private MessageAggregator(ActorRef actorRef, Class<T> cls, int i, Duration duration) {
        this.initialReceiver = actorRef;
        this.expectedMessages = i;
        this.messageClass = cls;
        getTimers().startSingleTimer(TIMEOUT, TIMEOUT, duration);
    }

    public static <T> Props props(ActorRef actorRef, Class<T> cls, int i, Duration duration) {
        return Props.create((Class<?>) MessageAggregator.class, actorRef, cls, Integer.valueOf(i), duration);
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().matchEquals(TIMEOUT, this::handleTimeout).matchAny(obj -> {
            this.sender = getSender();
            this.log.debug("MessageAggregator: Forwarding <{}> to <{}> on behalf of <{}>", obj, this.initialReceiver, this.sender);
            this.initialReceiver.tell(obj, getSelf());
            if (this.expectedMessages > 0) {
                getContext().become(listeningBehavior());
            } else {
                reportAndStop();
            }
        }).build();
    }

    private AbstractActor.Receive listeningBehavior() {
        return ReceiveBuilder.create().match(this.messageClass, obj -> {
            this.log.debug("MessageAggregator: Received {} <{}>", this.messageClass, obj);
            this.messages.add(obj);
            if (this.messages.size() >= this.expectedMessages) {
                reportAndStop();
            }
        }).matchEquals(TIMEOUT, this::handleTimeout).matchAny(obj2 -> {
            this.log.warning("MessageAggregator: not handled: <{}>", obj2);
        }).build();
    }

    private void handleTimeout(Object obj) {
        this.log.error("MessageAggregator: Timeout. Received {}/{} sender=<{}> messages=<{}>", Integer.valueOf(this.messages.size()), Integer.valueOf(this.expectedMessages), this.sender, this.messages);
        reportAndStop();
    }

    private void reportAndStop() {
        if (this.sender != null) {
            this.log.debug("MessageAggregator: reporting to <{}> the collected messages <{}>", this.sender, this.messages);
            this.sender.tell(this.messages, getSelf());
        } else {
            this.log.error("MessageAggregator: This should not happen: sender==null. messages=<{}>", this.messages);
        }
        getContext().stop(getSelf());
    }
}
