package akka.contrib.persistence.mongodb;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.event.LoggingAdapter;
import akka.stream.actor.ActorPublisher;
import akka.stream.actor.ActorPublisher$Internal$LifecycleState;
import akka.stream.actor.ActorPublisherState;
import com.mongodb.casbah.Imports$;
import com.mongodb.casbah.MongoCollection;
import com.mongodb.casbah.commons.NotNothing$;
import org.reactivestreams.Subscriber;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Stream;
import scala.collection.immutable.Stream$;
import scala.collection.immutable.Vector;
import scala.concurrent.duration.Duration;
import scala.reflect.ManifestFactory$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Random$;

/* compiled from: CasbahPersistenceReadJournaller.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055q!B\u0001\u0003\u0011\u0003Y\u0011!F\"veJ,g\u000e\u001e)feNL7\u000f^3oG\u0016LEm\u001d\u0006\u0003\u0007\u0011\tq!\\8oO>$'M\u0003\u0002\u0006\r\u0005Y\u0001/\u001a:tSN$XM\\2f\u0015\t9\u0001\"A\u0004d_:$(/\u001b2\u000b\u0003%\tA!Y6lC\u000e\u0001\u0001C\u0001\u0007\u000e\u001b\u0005\u0011a!\u0002\b\u0003\u0011\u0003y!!F\"veJ,g\u000e\u001e)feNL7\u000f^3oG\u0016LEm]\n\u0003\u001bA\u0001\"!\u0005\u000b\u000e\u0003IQ\u0011aE\u0001\u0006g\u000e\fG.Y\u0005\u0003+I\u0011a!\u00118z%\u00164\u0007\"B\f\u000e\t\u0003A\u0012A\u0002\u001fj]&$h\bF\u0001\f\u0011\u0015QR\u0002\"\u0001\u001c\u0003\u0015\u0001(o\u001c9t)\ta\"\u0005\u0005\u0002\u001eA5\taD\u0003\u0002 \u0011\u0005)\u0011m\u0019;pe&\u0011\u0011E\b\u0002\u0006!J|\u0007o\u001d\u0005\u0006Ge\u0001\r\u0001J\u0001\u0007IJLg/\u001a:\u0011\u00051)\u0013B\u0001\u0014\u0003\u0005E\u0019\u0015m\u001d2bQ6{gnZ8Ee&4XM\u001d\u0004\u0005\u001d\t\u0001\u0001fE\u0002(!%\u0002B\u0001\u0004\u0016-o%\u00111F\u0001\u0002\u0013'ft7-Q2u_J\u0004VO\u00197jg\",'\u000f\u0005\u0002.i9\u0011aF\r\t\u0003_Ii\u0011\u0001\r\u0006\u0003c)\ta\u0001\u0010:p_Rt\u0014BA\u001a\u0013\u0003\u0019\u0001&/\u001a3fM&\u0011QG\u000e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005M\u0012\u0002c\u0001\u001d>Y9\u0011\u0011h\u000f\b\u0003_iJ\u0011aE\u0005\u0003yI\tq\u0001]1dW\u0006<W-\u0003\u0002?\u007f\t11\u000b\u001e:fC6T!\u0001\u0010\n\t\u0011\r:#Q1A\u0005\u0002\u0005+\u0012\u0001\n\u0005\t\u0007\u001e\u0012\t\u0011)A\u0005I\u00059AM]5wKJ\u0004\u0003\"B\f(\t\u0003)EC\u0001$H!\taq\u0005C\u0003$\t\u0002\u0007A\u0005C\u0004JO\t\u0007I\u0011\u0001&\u0002/Q,W\u000e]8sCJL8i\u001c7mK\u000e$\u0018n\u001c8OC6,W#\u0001\u0017\t\r1;\u0003\u0015!\u0003-\u0003a!X-\u001c9pe\u0006\u0014\u0018pQ8mY\u0016\u001cG/[8o\u001d\u0006lW\r\t\u0005\b\u001d\u001e\u0012\r\u0011\"\u0001P\u0003M!X-\u001c9pe\u0006\u0014\u0018pQ8mY\u0016\u001cG/[8o+\u0005\u0001\u0006CA)`\u001d\t\u0011FL\u0004\u0002T3:\u0011Ak\u0016\b\u0003_UK\u0011AV\u0001\u0004G>l\u0017BA\u0002Y\u0015\u00051\u0016B\u0001.\\\u0003\u0019\u0019\u0017m\u001d2bQ*\u00111\u0001W\u0005\u0003;z\u000bq!S7q_J$8O\u0003\u0002[7&\u0011\u0001-\u0019\u0002\u0010\u001b>twm\\\"pY2,7\r^5p]&\u0011!M\u0018\u0002\f)f\u0004X-S7q_J$8\u000f\u0003\u0004eO\u0001\u0006I\u0001U\u0001\u0015i\u0016l\u0007o\u001c:bef\u001cu\u000e\u001c7fGRLwN\u001c\u0011\t\u000b\u0019<C\u0011K4\u0002\u001b%t\u0017\u000e^5bY\u000e+(o]8s+\u00059\u0004\"B5(\t#R\u0017\u0001\u00028fqR$2a[9t!\u0011\tBN\\\u001c\n\u00055\u0014\"A\u0002+va2,'\u0007E\u00029_2J!\u0001] \u0003\rY+7\r^8s\u0011\u0015\u0011\b\u000e1\u00018\u0003\u0005\u0019\u0007\"\u0002;i\u0001\u0004)\u0018AB1u\u001b>\u001cH\u000f\u0005\u0002\u0012m&\u0011qO\u0005\u0002\u0005\u0019>tw\rC\u0003zO\u0011E#0A\u0006jg\u000e{W\u000e\u001d7fi\u0016$GCA>\u007f!\t\tB0\u0003\u0002~%\t9!i\\8mK\u0006t\u0007\"\u0002:y\u0001\u00049\u0004bBA\u0001O\u0011E\u00131A\u0001\bI&\u001c8-\u0019:e)\u0011\t)!a\u0003\u0011\u0007E\t9!C\u0002\u0002\nI\u0011A!\u00168ji\")!o a\u0001o\u0001")
/* loaded from: input_file:akka/contrib/persistence/mongodb/CurrentPersistenceIds.class */
public class CurrentPersistenceIds implements SyncActorPublisher<String, Stream<String>> {
    private final CasbahMongoDriver driver;
    private final String temporaryCollectionName;
    private final MongoCollection temporaryCollection;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private final ActorPublisherState akka$stream$actor$ActorPublisher$$state;
    private Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber;
    private long akka$stream$actor$ActorPublisher$$demand;
    private ActorPublisher$Internal$LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState;
    private Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    private final ActorContext context;
    private final ActorRef self;

    public static Props props(CasbahMongoDriver casbahMongoDriver) {
        return CurrentPersistenceIds$.MODULE$.props(casbahMongoDriver);
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public /* synthetic */ void akka$contrib$persistence$mongodb$SyncActorPublisher$$super$preStart() {
        preStart();
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher, akka.actor.Actor
    public void preStart() {
        preStart();
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher, akka.actor.Actor
    public PartialFunction<Object, BoxedUnit> receive() {
        PartialFunction<Object, BoxedUnit> receive;
        receive = receive();
        return receive;
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public PartialFunction streaming(Stream<String> stream, long j) {
        PartialFunction streaming;
        streaming = streaming(stream, j);
        return streaming;
    }

    @Override // akka.actor.ActorLogging
    public LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundReceive(PartialFunction partialFunction, Object obj) {
        aroundReceive(partialFunction, obj);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreStart() {
        aroundPreStart();
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreRestart(Throwable th, Option option) {
        aroundPreRestart(th, option);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostRestart(Throwable th) {
        aroundPostRestart(th);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostStop() {
        aroundPostStop();
    }

    @Override // akka.stream.actor.ActorPublisher
    public Duration subscriptionTimeout() {
        Duration subscriptionTimeout;
        subscriptionTimeout = subscriptionTimeout();
        return subscriptionTimeout;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isActive() {
        boolean isActive;
        isActive = isActive();
        return isActive;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final long totalDemand() {
        long j;
        j = totalDemand();
        return j;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isCompleted() {
        boolean isCompleted;
        isCompleted = isCompleted();
        return isCompleted;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isErrorEmitted() {
        boolean isErrorEmitted;
        isErrorEmitted = isErrorEmitted();
        return isErrorEmitted;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isCanceled() {
        boolean isCanceled;
        isCanceled = isCanceled();
        return isCanceled;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onNext(Object obj) {
        onNext(obj);
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onComplete() {
        onComplete();
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onCompleteThenStop() {
        onCompleteThenStop();
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onError(Throwable th) {
        onError(th);
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onErrorThenStop(Throwable th) {
        onErrorThenStop(th);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        aroundReceive(partialFunction, obj);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPreStart() {
        aroundPreStart();
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPreRestart(Throwable th, Option<Object> option) {
        aroundPreRestart(th, option);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPostRestart(Throwable th) {
        aroundPostRestart(th);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPostStop() {
        aroundPostStop();
    }

    @Override // akka.actor.Actor
    public final ActorRef sender() {
        ActorRef sender;
        sender = sender();
        return sender;
    }

    @Override // akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        SupervisorStrategy supervisorStrategy;
        supervisorStrategy = supervisorStrategy();
        return supervisorStrategy;
    }

    @Override // akka.actor.Actor
    public void postStop() throws Exception {
        postStop();
    }

    @Override // akka.actor.Actor
    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        preRestart(th, option);
    }

    @Override // akka.actor.Actor
    public void postRestart(Throwable th) throws Exception {
        postRestart(th);
    }

    @Override // akka.actor.Actor
    public void unhandled(Object obj) {
        unhandled(obj);
    }

    @Override // akka.actor.ActorLogging
    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    @Override // akka.actor.ActorLogging
    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    @Override // akka.stream.actor.ActorPublisher
    public ActorPublisherState akka$stream$actor$ActorPublisher$$state() {
        return this.akka$stream$actor$ActorPublisher$$state;
    }

    @Override // akka.stream.actor.ActorPublisher
    public Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber() {
        return this.akka$stream$actor$ActorPublisher$$subscriber;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$subscriber_$eq(Subscriber<Object> subscriber) {
        this.akka$stream$actor$ActorPublisher$$subscriber = subscriber;
    }

    @Override // akka.stream.actor.ActorPublisher
    public long akka$stream$actor$ActorPublisher$$demand() {
        return this.akka$stream$actor$ActorPublisher$$demand;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$demand_$eq(long j) {
        this.akka$stream$actor$ActorPublisher$$demand = j;
    }

    @Override // akka.stream.actor.ActorPublisher
    public ActorPublisher$Internal$LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState() {
        return this.akka$stream$actor$ActorPublisher$$lifecycleState;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$lifecycleState_$eq(ActorPublisher$Internal$LifecycleState actorPublisher$Internal$LifecycleState) {
        this.akka$stream$actor$ActorPublisher$$lifecycleState = actorPublisher$Internal$LifecycleState;
    }

    @Override // akka.stream.actor.ActorPublisher
    public Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout() {
        return this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout_$eq(Cancellable cancellable) {
        this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout = cancellable;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final void akka$stream$actor$ActorPublisher$_setter_$akka$stream$actor$ActorPublisher$$state_$eq(ActorPublisherState actorPublisherState) {
        this.akka$stream$actor$ActorPublisher$$state = actorPublisherState;
    }

    @Override // akka.actor.Actor
    public ActorContext context() {
        return this.context;
    }

    @Override // akka.actor.Actor
    public final ActorRef self() {
        return this.self;
    }

    @Override // akka.actor.Actor
    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    @Override // akka.actor.Actor
    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public CasbahMongoDriver driver() {
        return this.driver;
    }

    public String temporaryCollectionName() {
        return this.temporaryCollectionName;
    }

    public MongoCollection temporaryCollection() {
        return this.temporaryCollection;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public Stream<String> initialCursor() {
        return (Stream) ((Stream) driver().getJournalCollections().toStream().flatMap(mongoCollection -> {
            mongoCollection.aggregate(Nil$.MODULE$.$colon$colon(Imports$.MODULE$.MongoDBObject().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("$out"), this.temporaryCollectionName())}))).$colon$colon(Imports$.MODULE$.MongoDBObject().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("$group"), Imports$.MODULE$.MongoDBObject().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("_id"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"$", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{"pid"})))})))}))).$colon$colon(Imports$.MODULE$.MongoDBObject().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("$project"), Imports$.MODULE$.MongoDBObject().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("pid"), BoxesRunTime.boxToInteger(1))})))}))), Predef$.MODULE$.$conforms()).results();
            return ((Iterator) this.temporaryCollection().find()).toStream();
        }, Stream$.MODULE$.canBuildFrom())).flatMap(dBObject -> {
            return Option$.MODULE$.option2Iterable(Imports$.MODULE$.wrapDBObj(dBObject).getAs("_id", NotNothing$.MODULE$.notNothing(), ManifestFactory$.MODULE$.classType(String.class)));
        }, Stream$.MODULE$.canBuildFrom());
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public Tuple2<Vector<String>, Stream<String>> next(Stream<String> stream, long j) {
        Tuple2<Stream<String>, Stream<String>> splitAt = stream.splitAt(package$NonWrappingLongToInt$.MODULE$.toIntWithoutWrapping$extension(package$.MODULE$.NonWrappingLongToInt(j)));
        if (splitAt == null) {
            throw new MatchError(splitAt);
        }
        Tuple2 tuple2 = new Tuple2(splitAt.mo5600_1(), splitAt.mo5599_2());
        Stream stream2 = (Stream) tuple2.mo5600_1();
        return new Tuple2<>(stream2.toVector(), (Stream) tuple2.mo5599_2());
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public boolean isCompleted(Stream<String> stream) {
        return stream.isEmpty();
    }

    @Override // akka.contrib.persistence.mongodb.SyncActorPublisher
    public void discard(Stream<String> stream) {
        temporaryCollection().drop();
    }

    public CurrentPersistenceIds(CasbahMongoDriver casbahMongoDriver) {
        this.driver = casbahMongoDriver;
        Actor.$init$(this);
        ActorPublisher.$init$((ActorPublisher) this);
        ActorLogging.$init$(this);
        SyncActorPublisher.$init$((SyncActorPublisher) this);
        this.temporaryCollectionName = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"persistenceids-", "-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(System.currentTimeMillis()), BoxesRunTime.boxToInteger(Random$.MODULE$.nextInt(1000))}));
        this.temporaryCollection = casbahMongoDriver.collection(temporaryCollectionName());
    }
}
