package akka.cluster.sharding.typed.delivery.internal;

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorRef$;
import akka.actor.typed.ActorRef$ActorRefOps$;
import akka.actor.typed.Behavior;
import akka.actor.typed.DispatcherSelector$;
import akka.actor.typed.delivery.ConsumerController;
import akka.actor.typed.delivery.DurableProducerQueue;
import akka.actor.typed.delivery.DurableProducerQueue$State$;
import akka.actor.typed.delivery.internal.ProducerControllerImpl$;
import akka.actor.typed.scaladsl.ActorContext;
import akka.actor.typed.scaladsl.Behaviors$;
import akka.actor.typed.scaladsl.StashBuffer;
import akka.annotation.InternalApi;
import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.delivery.ShardingProducerController;
import akka.cluster.sharding.typed.delivery.internal.ShardingProducerControllerImpl;
import akka.util.Timeout;
import akka.util.Timeout$;
import java.util.concurrent.TimeoutException;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: ShardingProducerControllerImpl.scala */
@InternalApi
/* loaded from: input_file:akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl$.class */
public final class ShardingProducerControllerImpl$ {
    public static final ShardingProducerControllerImpl$ MODULE$ = new ShardingProducerControllerImpl$();

    public <A> Behavior<ShardingProducerController.Command<A>> apply(String str, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> actorRef, Option<Behavior<DurableProducerQueue.Command<A>>> option, ShardingProducerController.Settings settings, ClassTag<A> classTag) {
        return Behaviors$.MODULE$.withStash(settings.bufferSize(), stashBuffer -> {
            return Behaviors$.MODULE$.setup(actorContext -> {
                Behaviors$ behaviors$ = Behaviors$.MODULE$;
                Map<String, String> map = (Map) Predef$.MODULE$.Map().apply2(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("producerId"), str)}));
                actorContext.setLoggerName("akka.cluster.sharding.typed.delivery.ShardingProducerController");
                Option askLoadState = MODULE$.askLoadState(actorContext, option, settings);
                return behaviors$.withMdc(map, (Behavior) MODULE$.waitingForStart(str, actorContext, stashBuffer, actorRef, askLoadState, None$.MODULE$, MODULE$.createInitialState(askLoadState.nonEmpty()), settings, classTag), ClassTag$.MODULE$.apply(ShardingProducerControllerImpl.InternalCommand.class));
            });
        }).narrow();
    }

    private <A> Option<DurableProducerQueue.State<A>> createInitialState(boolean z) {
        return z ? None$.MODULE$ : new Some(DurableProducerQueue$State$.MODULE$.empty());
    }

    private <A> Behavior<ShardingProducerControllerImpl.InternalCommand> waitingForStart(String str, ActorContext<ShardingProducerControllerImpl.InternalCommand> actorContext, StashBuffer<ShardingProducerControllerImpl.InternalCommand> stashBuffer, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> actorRef, Option<ActorRef<DurableProducerQueue.Command<A>>> option, Option<ActorRef<ShardingProducerController.RequestNext<A>>> option2, Option<DurableProducerQueue.State<A>> option3, ShardingProducerController.Settings settings, ClassTag<A> classTag) {
        return (Behavior) Behaviors$.MODULE$.receiveMessage(internalCommand -> {
            if (internalCommand instanceof ShardingProducerController.Start) {
                ShardingProducerController.Start start = (ShardingProducerController.Start) internalCommand;
                ProducerControllerImpl$.MODULE$.enforceLocalProducer(start.producer());
                if (option3 instanceof Some) {
                    return becomeActive$1(start.producer(), (DurableProducerQueue.State) ((Some) option3).value(), settings, stashBuffer, actorContext, str, actorRef, option, classTag);
                }
                if (None$.MODULE$.equals(option3)) {
                    return MODULE$.waitingForStart(str, actorContext, stashBuffer, actorRef, option, new Some(start.producer()), option3, settings, classTag);
                }
                throw new MatchError(option3);
            }
            if (internalCommand instanceof ShardingProducerControllerImpl.LoadStateReply) {
                ShardingProducerControllerImpl.LoadStateReply loadStateReply = (ShardingProducerControllerImpl.LoadStateReply) internalCommand;
                if (option2 instanceof Some) {
                    return becomeActive$1((ActorRef) ((Some) option2).value(), loadStateReply.state(), settings, stashBuffer, actorContext, str, actorRef, option, classTag);
                }
                if (None$.MODULE$.equals(option2)) {
                    return MODULE$.waitingForStart(str, actorContext, stashBuffer, actorRef, option, option2, new Some(loadStateReply.state()), settings, classTag);
                }
                throw new MatchError(option2);
            }
            if (!(internalCommand instanceof ShardingProducerControllerImpl.LoadStateFailed)) {
                if (ShardingProducerControllerImpl$DurableQueueTerminated$.MODULE$.equals(internalCommand)) {
                    throw new IllegalStateException("DurableQueue was unexpectedly terminated.");
                }
                MODULE$.checkStashFull(stashBuffer);
                stashBuffer.stash(internalCommand);
                return Behaviors$.MODULE$.same();
            }
            int attempt = ((ShardingProducerControllerImpl.LoadStateFailed) internalCommand).attempt();
            if (attempt >= settings.producerControllerSettings().durableQueueRetryAttempts()) {
                String sb = new StringBuilder(46).append("LoadState failed after [").append(attempt).append("] attempts, giving up.").toString();
                actorContext.log().error(sb);
                throw new TimeoutException(sb);
            }
            actorContext.log().warn("LoadState failed, attempt [{}] of [{}], retrying.", BoxesRunTime.boxToInteger(attempt), BoxesRunTime.boxToInteger(settings.producerControllerSettings().durableQueueRetryAttempts()));
            MODULE$.askLoadState(actorContext, option, settings, attempt + 1);
            return Behaviors$.MODULE$.same();
        });
    }

    private <A> void checkStashFull(StashBuffer<ShardingProducerControllerImpl.InternalCommand> stashBuffer) {
        if (stashBuffer.isFull()) {
            throw new IllegalArgumentException(new StringBuilder(24).append("Buffer is full, size [").append(stashBuffer.size()).append("].").toString());
        }
    }

    private <A> Option<ActorRef<DurableProducerQueue.Command<A>>> askLoadState(ActorContext<ShardingProducerControllerImpl.InternalCommand> actorContext, Option<Behavior<DurableProducerQueue.Command<A>>> option, ShardingProducerController.Settings settings) {
        return (Option<ActorRef<DurableProducerQueue.Command<A>>>) option.map(behavior -> {
            ActorRef spawn = actorContext.spawn(behavior, "durable", DispatcherSelector$.MODULE$.sameAsParent());
            actorContext.watchWith(spawn, ShardingProducerControllerImpl$DurableQueueTerminated$.MODULE$);
            MODULE$.askLoadState(actorContext, new Some(spawn), settings, 1);
            return spawn;
        });
    }

    private <A> void askLoadState(ActorContext<ShardingProducerControllerImpl.InternalCommand> actorContext, Option<ActorRef<DurableProducerQueue.Command<A>>> option, ShardingProducerController.Settings settings, int i) {
        Timeout durationToTimeout = Timeout$.MODULE$.durationToTimeout(settings.producerControllerSettings().durableQueueRequestTimeout());
        option.foreach(actorRef -> {
            $anonfun$askLoadState$2(actorContext, i, durationToTimeout, actorRef);
            return BoxedUnit.UNIT;
        });
    }

    private static final Behavior becomeActive$1(ActorRef actorRef, DurableProducerQueue.State state, ShardingProducerController.Settings settings, StashBuffer stashBuffer, ActorContext actorContext, String str, ActorRef actorRef2, Option option, ClassTag classTag) {
        return Behaviors$.MODULE$.withTimers(timerScheduler -> {
            timerScheduler.startTimerWithFixedDelay(ShardingProducerControllerImpl$CleanupUnused$.MODULE$, settings.cleanupUnusedAfter().$div(2L));
            timerScheduler.startTimerWithFixedDelay(ShardingProducerControllerImpl$ResendFirstUnconfirmed$.MODULE$, settings.resendFirstUnconfirmedIdleTimeout().$div(2L));
            return Behaviors$.MODULE$.withStash(settings.bufferSize(), stashBuffer2 -> {
                return Behaviors$.MODULE$.setup(actorContext2 -> {
                    state.unconfirmed().foreach(messageSent -> {
                        return stashBuffer2.stash(new ShardingProducerControllerImpl.Msg(new ShardingEnvelope(messageSent.confirmationQualifier(), messageSent.message()), messageSent.seqNr()));
                    });
                    stashBuffer.foreach(internalCommand -> {
                        stashBuffer2.stash(internalCommand);
                        return BoxedUnit.UNIT;
                    });
                    ActorRef messageAdapter = actorContext.messageAdapter(shardingEnvelope -> {
                        return new ShardingProducerControllerImpl.Msg(shardingEnvelope, 0L);
                    }, ClassTag$.MODULE$.apply(ShardingEnvelope.class));
                    if (state.unconfirmed().isEmpty()) {
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorRef), new ShardingProducerController.RequestNext(messageAdapter, actorContext.self(), Predef$.MODULE$.Set().empty2(), Predef$.MODULE$.Map().empty2()));
                    }
                    return stashBuffer2.unstashAll(new ShardingProducerControllerImpl(actorContext, str, messageAdapter, actorRef2, option, settings, classTag).akka$cluster$sharding$typed$delivery$internal$ShardingProducerControllerImpl$$active(new ShardingProducerControllerImpl.State(state.currentSeqNr(), actorRef, Predef$.MODULE$.Map().empty2(), Predef$.MODULE$.Map().empty2())));
                });
            });
        });
    }

    public static final /* synthetic */ void $anonfun$askLoadState$2(ActorContext actorContext, int i, Timeout timeout, ActorRef actorRef) {
        actorContext.ask(actorRef, actorRef2 -> {
            return new DurableProducerQueue.LoadState(actorRef2);
        }, r5 -> {
            if (r5 instanceof Success) {
                return new ShardingProducerControllerImpl.LoadStateReply((DurableProducerQueue.State) ((Success) r5).value());
            }
            if (r5 instanceof Failure) {
                return new ShardingProducerControllerImpl.LoadStateFailed(i);
            }
            throw new MatchError(r5);
        }, timeout, ClassTag$.MODULE$.apply(DurableProducerQueue.State.class));
    }

    private ShardingProducerControllerImpl$() {
    }
}
