package org.eclipse.ditto.internal.utils.pekko.streaming;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.List;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.event.DiagnosticLoggingAdapter;
import org.apache.pekko.event.Logging;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.StreamRefs;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pekko/streaming/AbstractStreamingActor.class */
public abstract class AbstractStreamingActor<C, E> extends AbstractActor {
    protected final DiagnosticLoggingAdapter log = Logging.apply(this);
    protected final Materializer materializer = Materializer.createMaterializer(this::getContext);

    protected abstract Class<C> getCommandClass();

    protected abstract int getBurst(C c);

    protected abstract Duration getInitialTimeout(C c);

    protected abstract Duration getIdleTimeout(C c);

    protected abstract Source<E, NotUsed> createSource(C c);

    protected Object batchMessages(List<E> list) {
        return list.size() == 1 ? list.get(0) : list;
    }

    @Override // org.apache.pekko.actor.AbstractActor
    public final AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(getCommandClass(), this::startStreaming).matchAny(obj -> {
            this.log.warning("Unexpected message: <{}>", obj);
        }).build();
    }

    private void startStreaming(C c) {
        this.log.debug("Starting streaming due to command: {}", c);
        int burst = getBurst(c);
        Duration initialTimeout = getInitialTimeout(c);
        getSender().tell((SourceRef) createSource(c).grouped(burst).map(this::batchMessages).initialTimeout(initialTimeout).idleTimeout(getIdleTimeout(c)).runWith(StreamRefs.sourceRef(), this.materializer), getSelf());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1285317286:
                if (implMethodName.equals("batchMessages")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/pekko/streaming/AbstractStreamingActor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;)Ljava/lang/Object;")) {
                    AbstractStreamingActor abstractStreamingActor = (AbstractStreamingActor) serializedLambda.getCapturedArg(0);
                    return abstractStreamingActor::batchMessages;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
