package akka.contrib.persistence.mongodb;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.event.LoggingAdapter;
import akka.stream.actor.ActorPublisher;
import akka.stream.actor.ActorPublisher$Internal$LifecycleState;
import akka.stream.actor.ActorPublisherState;
import com.mongodb.DBObject;
import com.mongodb.casbah.Imports$;
import com.mongodb.casbah.MongoCollectionBase;
import com.mongodb.casbah.MongoCursorBase;
import com.mongodb.casbah.commons.MongoDBList;
import com.mongodb.casbah.commons.MongoDBObject;
import com.mongodb.casbah.commons.NotNothing$;
import com.mongodb.casbah.query.AsQueryParam$;
import org.reactivestreams.Subscriber;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.TraversableLike;
import scala.collection.immutable.Stream;
import scala.collection.immutable.Stream$;
import scala.collection.immutable.Vector;
import scala.collection.mutable.Seq;
import scala.collection.mutable.Seq$;
import scala.concurrent.duration.Duration;
import scala.reflect.ManifestFactory$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: CasbahPersistenceReadJournaller.scala */
@ScalaSignature(bytes = "\u0006\u0001a<Q!\u0001\u0002\t\u0002-\tAdQ;se\u0016tG/\u0012<f]R\u001c()\u001f)feNL7\u000f^3oG\u0016LEM\u0003\u0002\u0004\t\u00059Qn\u001c8h_\u0012\u0014'BA\u0003\u0007\u0003-\u0001XM]:jgR,gnY3\u000b\u0005\u001dA\u0011aB2p]R\u0014\u0018N\u0019\u0006\u0002\u0013\u0005!\u0011m[6b\u0007\u0001\u0001\"\u0001D\u0007\u000e\u0003\t1QA\u0004\u0002\t\u0002=\u0011AdQ;se\u0016tG/\u0012<f]R\u001c()\u001f)feNL7\u000f^3oG\u0016LEm\u0005\u0002\u000e!A\u0011\u0011\u0003F\u0007\u0002%)\t1#A\u0003tG\u0006d\u0017-\u0003\u0002\u0016%\t1\u0011I\\=SK\u001aDQaF\u0007\u0005\u0002a\ta\u0001P5oSRtD#A\u0006\t\u000biiA\u0011A\u000e\u0002\u000bA\u0014x\u000e]:\u0015\u000bq\u0011s\u0005N\u001d\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0005}A\u0011!B1di>\u0014\u0018BA\u0011\u001f\u0005\u0015\u0001&o\u001c9t\u0011\u0015\u0019\u0013\u00041\u0001%\u0003\u0019!'/\u001b<feB\u0011A\"J\u0005\u0003M\t\u0011\u0011cQ1tE\u0006DWj\u001c8h_\u0012\u0013\u0018N^3s\u0011\u0015A\u0013\u00041\u0001*\u00035\u0001XM]:jgR,gnY3JIB\u0011!&\r\b\u0003W=\u0002\"\u0001\f\n\u000e\u00035R!A\f\u0006\u0002\rq\u0012xn\u001c;?\u0013\t\u0001$#\u0001\u0004Qe\u0016$WMZ\u0005\u0003eM\u0012aa\u0015;sS:<'B\u0001\u0019\u0013\u0011\u0015)\u0014\u00041\u00017\u0003\u001d1'o\\7TKF\u0004\"!E\u001c\n\u0005a\u0012\"\u0001\u0002'p]\u001eDQAO\rA\u0002Y\nQ\u0001^8TKF4AA\u0004\u0002\u0001yM\u00191\bE\u001f\u0011\t1q\u0004iQ\u0005\u0003\u007f\t\u0011!cU=oG\u0006\u001bGo\u001c:Qk\nd\u0017n\u001d5feB\u0011A\"Q\u0005\u0003\u0005\n\u0011Q!\u0012<f]R\u00042\u0001R%A\u001d\t)uI\u0004\u0002-\r&\t1#\u0003\u0002I%\u00059\u0001/Y2lC\u001e,\u0017B\u0001&L\u0005\u0019\u0019FO]3b[*\u0011\u0001J\u0005\u0005\tGm\u0012)\u0019!C\u0001\u001bV\tA\u0005\u0003\u0005Pw\t\u0005\t\u0015!\u0003%\u0003\u001d!'/\u001b<fe\u0002B\u0001\u0002K\u001e\u0003\u0002\u0003\u0006I!\u000b\u0005\tkm\u0012\t\u0011)A\u0005m!A!h\u000fB\u0001B\u0003%a\u0007C\u0003\u0018w\u0011\u0005A\u000bF\u0003V-^C\u0016\f\u0005\u0002\rw!)1e\u0015a\u0001I!)\u0001f\u0015a\u0001S!)Qg\u0015a\u0001m!)!h\u0015a\u0001m!)1l\u000fC)9\u0006i\u0011N\\5uS\u0006d7)\u001e:t_J,\u0012a\u0011\u0005\u0006=n\"\tfX\u0001\u0005]\u0016DH\u000fF\u0002aM\"\u0004B!E1d\u0007&\u0011!M\u0005\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u0007\u0011#\u0007)\u0003\u0002f\u0017\n1a+Z2u_JDQaZ/A\u0002\r\u000b\u0011a\u0019\u0005\u0006Sv\u0003\rAN\u0001\u0007CRlun\u001d;\t\u000b-\\D\u0011\u000b7\u0002\u0017%\u001c8i\\7qY\u0016$X\r\u001a\u000b\u0003[B\u0004\"!\u00058\n\u0005=\u0014\"a\u0002\"p_2,\u0017M\u001c\u0005\u0006O*\u0004\ra\u0011\u0005\u0006en\"\tf]\u0001\bI&\u001c8-\u0019:e)\t!x\u000f\u0005\u0002\u0012k&\u0011aO\u0005\u0002\u0005+:LG\u000fC\u0003hc\u0002\u00071\t")
/* loaded from: input_file:akka/contrib/persistence/mongodb/CurrentEventsByPersistenceId.class */
public class CurrentEventsByPersistenceId implements SyncActorPublisher<Event, Stream<Event>> {
    private final CasbahMongoDriver driver;
    private final String persistenceId;
    private final long fromSeq;
    private final long toSeq;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private final ActorPublisherState akka$stream$actor$ActorPublisher$$state;
    private Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber;
    private long akka$stream$actor$ActorPublisher$$demand;
    private ActorPublisher$Internal$LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState;
    private Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    private final ActorContext context;
    private final ActorRef self;

    public static Props props(CasbahMongoDriver casbahMongoDriver, String str, long j, long j2) {
        return CurrentEventsByPersistenceId$.MODULE$.props(casbahMongoDriver, str, j, j2);
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public /* synthetic */ void akka$contrib$persistence$mongodb$SyncActorPublisher$$super$preStart() {
        preStart();
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher, akka.actor.Actor
    public void preStart() {
        preStart();
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher, akka.actor.Actor
    public PartialFunction<Object, BoxedUnit> receive() {
        PartialFunction<Object, BoxedUnit> receive;
        receive = receive();
        return receive;
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public PartialFunction streaming(Stream<Event> stream, long j) {
        PartialFunction streaming;
        streaming = streaming(stream, j);
        return streaming;
    }

    @Override // akka.actor.ActorLogging
    public LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundReceive(PartialFunction partialFunction, Object obj) {
        aroundReceive(partialFunction, obj);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreStart() {
        aroundPreStart();
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreRestart(Throwable th, Option option) {
        aroundPreRestart(th, option);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostRestart(Throwable th) {
        aroundPostRestart(th);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostStop() {
        aroundPostStop();
    }

    @Override // akka.stream.actor.ActorPublisher
    public Duration subscriptionTimeout() {
        Duration subscriptionTimeout;
        subscriptionTimeout = subscriptionTimeout();
        return subscriptionTimeout;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isActive() {
        boolean isActive;
        isActive = isActive();
        return isActive;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final long totalDemand() {
        long j;
        j = totalDemand();
        return j;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isCompleted() {
        boolean isCompleted;
        isCompleted = isCompleted();
        return isCompleted;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isErrorEmitted() {
        boolean isErrorEmitted;
        isErrorEmitted = isErrorEmitted();
        return isErrorEmitted;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isCanceled() {
        boolean isCanceled;
        isCanceled = isCanceled();
        return isCanceled;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onNext(Object obj) {
        onNext(obj);
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onComplete() {
        onComplete();
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onCompleteThenStop() {
        onCompleteThenStop();
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onError(Throwable th) {
        onError(th);
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onErrorThenStop(Throwable th) {
        onErrorThenStop(th);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        aroundReceive(partialFunction, obj);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPreStart() {
        aroundPreStart();
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPreRestart(Throwable th, Option<Object> option) {
        aroundPreRestart(th, option);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPostRestart(Throwable th) {
        aroundPostRestart(th);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPostStop() {
        aroundPostStop();
    }

    @Override // akka.actor.Actor
    public final ActorRef sender() {
        ActorRef sender;
        sender = sender();
        return sender;
    }

    @Override // akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        SupervisorStrategy supervisorStrategy;
        supervisorStrategy = supervisorStrategy();
        return supervisorStrategy;
    }

    @Override // akka.actor.Actor
    public void postStop() throws Exception {
        postStop();
    }

    @Override // akka.actor.Actor
    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        preRestart(th, option);
    }

    @Override // akka.actor.Actor
    public void postRestart(Throwable th) throws Exception {
        postRestart(th);
    }

    @Override // akka.actor.Actor
    public void unhandled(Object obj) {
        unhandled(obj);
    }

    @Override // akka.actor.ActorLogging
    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    @Override // akka.actor.ActorLogging
    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    @Override // akka.stream.actor.ActorPublisher
    public ActorPublisherState akka$stream$actor$ActorPublisher$$state() {
        return this.akka$stream$actor$ActorPublisher$$state;
    }

    @Override // akka.stream.actor.ActorPublisher
    public Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber() {
        return this.akka$stream$actor$ActorPublisher$$subscriber;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$subscriber_$eq(Subscriber<Object> subscriber) {
        this.akka$stream$actor$ActorPublisher$$subscriber = subscriber;
    }

    @Override // akka.stream.actor.ActorPublisher
    public long akka$stream$actor$ActorPublisher$$demand() {
        return this.akka$stream$actor$ActorPublisher$$demand;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$demand_$eq(long j) {
        this.akka$stream$actor$ActorPublisher$$demand = j;
    }

    @Override // akka.stream.actor.ActorPublisher
    public ActorPublisher$Internal$LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState() {
        return this.akka$stream$actor$ActorPublisher$$lifecycleState;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$lifecycleState_$eq(ActorPublisher$Internal$LifecycleState actorPublisher$Internal$LifecycleState) {
        this.akka$stream$actor$ActorPublisher$$lifecycleState = actorPublisher$Internal$LifecycleState;
    }

    @Override // akka.stream.actor.ActorPublisher
    public Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout() {
        return this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout_$eq(Cancellable cancellable) {
        this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout = cancellable;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final void akka$stream$actor$ActorPublisher$_setter_$akka$stream$actor$ActorPublisher$$state_$eq(ActorPublisherState actorPublisherState) {
        this.akka$stream$actor$ActorPublisher$$state = actorPublisherState;
    }

    @Override // akka.actor.Actor
    public ActorContext context() {
        return this.context;
    }

    @Override // akka.actor.Actor
    public final ActorRef self() {
        return this.self;
    }

    @Override // akka.actor.Actor
    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    @Override // akka.actor.Actor
    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public CasbahMongoDriver driver() {
        return this.driver;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public Stream<Event> initialCursor() {
        return (Stream) ((Stream) ((TraversableLike) ((Stream) ((Iterator) ((MongoCursorBase) ((MongoCollectionBase) driver().getJournal(this.persistenceId)).find(Imports$.MODULE$.wrapDBObj(Imports$.MODULE$.wrapDBObj(Imports$.MODULE$.mongoQueryStatements("pid").$eq(this.persistenceId, AsQueryParam$.MODULE$.string())).$plus$plus((MongoDBObject) Imports$.MODULE$.mongoQueryStatements("from").$lte(BoxesRunTime.boxToLong(this.toSeq), AsQueryParam$.MODULE$.dateOrNumeric(Imports$.MODULE$.LongDoNOk())), (Function1<MongoDBObject, DBObject>) Predef$.MODULE$.$conforms())).$plus$plus((MongoDBObject) Imports$.MODULE$.mongoQueryStatements("to").$gte(BoxesRunTime.boxToLong(this.fromSeq), AsQueryParam$.MODULE$.dateOrNumeric(Imports$.MODULE$.LongDoNOk())), (Function1<MongoDBObject, DBObject>) Predef$.MODULE$.$conforms()), Predef$.MODULE$.$conforms())).sort(Imports$.MODULE$.MongoDBObject().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("to"), BoxesRunTime.boxToInteger(1))})), Predef$.MODULE$.$conforms())).toStream().flatMap(dBObject -> {
            return Option$.MODULE$.option2Iterable(Imports$.MODULE$.wrapDBObj(dBObject).getAs(org.eclipse.ditto.signals.events.base.Event.TYPE_QUALIFIER, NotNothing$.MODULE$.notNothing(), ManifestFactory$.MODULE$.classType(MongoDBList.class)));
        }, Stream$.MODULE$.canBuildFrom())).flatMap(mongoDBList -> {
            return (Seq) mongoDBList.collect(new CurrentEventsByPersistenceId$$anonfun$$nestedInanonfun$initialCursor$9$1(null), Seq$.MODULE$.canBuildFrom());
        }, Stream$.MODULE$.canBuildFrom())).filter(dBObject2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$initialCursor$10(this, dBObject2));
        })).map(dBObject3 -> {
            return this.driver().deserializeJournal(dBObject3, this.driver().CasbahSerializers().Deserializer());
        }, Stream$.MODULE$.canBuildFrom());
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public Tuple2<Vector<Event>, Stream<Event>> next(Stream<Event> stream, long j) {
        Tuple2<Stream<Event>, Stream<Event>> splitAt = stream.splitAt(package$NonWrappingLongToInt$.MODULE$.toIntWithoutWrapping$extension(package$.MODULE$.NonWrappingLongToInt(j)));
        if (splitAt == null) {
            throw new MatchError(splitAt);
        }
        Tuple2 tuple2 = new Tuple2(splitAt.mo5794_1(), splitAt.mo5793_2());
        Stream stream2 = (Stream) tuple2.mo5794_1();
        return new Tuple2<>(stream2.toVector(), (Stream) tuple2.mo5793_2());
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public boolean isCompleted(Stream<Event> stream) {
        return stream.isEmpty();
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public void discard(Stream<Event> stream) {
    }

    public static final /* synthetic */ boolean $anonfun$initialCursor$10(CurrentEventsByPersistenceId currentEventsByPersistenceId, DBObject dBObject) {
        return Imports$.MODULE$.wrapDBObj(dBObject).getAs("sn", NotNothing$.MODULE$.notNothing(), ManifestFactory$.MODULE$.Long()).exists(j -> {
            return j >= currentEventsByPersistenceId.fromSeq && j <= currentEventsByPersistenceId.toSeq;
        });
    }

    public CurrentEventsByPersistenceId(CasbahMongoDriver casbahMongoDriver, String str, long j, long j2) {
        this.driver = casbahMongoDriver;
        this.persistenceId = str;
        this.fromSeq = j;
        this.toSeq = j2;
        Actor.$init$(this);
        ActorPublisher.$init$((ActorPublisher) this);
        ActorLogging.$init$(this);
        SyncActorPublisher.$init$((SyncActorPublisher) this);
    }
}
