package akka.contrib.persistence.mongodb;

import akka.NotUsed;
import akka.persistence.query.Offset;
import akka.stream.KillSwitches$;
import akka.stream.Materializer;
import akka.stream.SharedKillSwitch;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.mongodb.CursorType;
import org.bson.BsonDocument;
import org.bson.BsonObjectId;
import org.bson.types.ObjectId;
import org.mongodb.scala.FindObservable;
import org.mongodb.scala.bson.DefaultHelper$DefaultsTo$;
import org.mongodb.scala.model.Filters$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Cpackage;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ScalaDriverPersistenceReadJournaller.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015a\u0001B\u0005\u000b\u0001MA\u0001b\u000e\u0001\u0003\u0002\u0003\u0006I\u0001\u000f\u0005\tw\u0001\u0011\t\u0011)A\u0006y!)\u0001\t\u0001C\u0001\u0003\"9a\t\u0001b\u0001\n\u00079\u0005B\u0002(\u0001A\u0003%\u0001\nC\u0004P\u0001\t\u0007I\u0011\u0002)\t\rA\u0004\u0001\u0015!\u0003R\u0011\u0015\t\b\u0001\"\u0001s\u0005a\u00196-\u00197b\tJLg/\u001a:K_V\u0014h.\u00197TiJ,\u0017-\u001c\u0006\u0003\u00171\tq!\\8oO>$'M\u0003\u0002\u000e\u001d\u0005Y\u0001/\u001a:tSN$XM\\2f\u0015\ty\u0001#A\u0004d_:$(/\u001b2\u000b\u0003E\tA!Y6lC\u000e\u00011c\u0001\u0001\u00155A\u0011Q\u0003G\u0007\u0002-)\tq#A\u0003tG\u0006d\u0017-\u0003\u0002\u001a-\t1\u0011I\\=SK\u001a\u00042a\u0007\u000f\u001f\u001b\u0005Q\u0011BA\u000f\u000b\u00055Qu.\u001e:oC2\u001cFO]3b[B!q\u0004\n\u00144\u001b\u0005\u0001#BA\u0011#\u0003!\u00198-\u00197bINd'BA\u0012\u0011\u0003\u0019\u0019HO]3b[&\u0011Q\u0005\t\u0002\u0007'>,(oY3\u0011\tU9\u0013\u0006L\u0005\u0003QY\u0011a\u0001V;qY\u0016\u0014\u0004CA\u000e+\u0013\tY#BA\u0003Fm\u0016tG\u000f\u0005\u0002.c5\taF\u0003\u00020a\u0005)\u0011/^3ss*\u0011Q\u0002E\u0005\u0003e9\u0012aa\u00144gg\u0016$\bC\u0001\u001b6\u001b\u0005\u0001\u0012B\u0001\u001c\u0011\u0005\u001dqu\u000e^+tK\u0012\fa\u0001\u001a:jm\u0016\u0014\bCA\u000e:\u0013\tQ$B\u0001\tTG\u0006d\u0017-T8oO>$%/\u001b<fe\u0006\tQ\u000e\u0005\u0002>}5\t!%\u0003\u0002@E\taQ*\u0019;fe&\fG.\u001b>fe\u00061A(\u001b8jiz\"\"AQ#\u0015\u0005\r#\u0005CA\u000e\u0001\u0011\u0015Y4\u0001q\u0001=\u0011\u001594\u00011\u00019\u0003\t)7-F\u0001I!\tIE*D\u0001K\u0015\tYe#\u0001\u0006d_:\u001cWO\u001d:f]RL!!\u0014&\u0003!\u0015CXmY;uS>t7i\u001c8uKb$\u0018aA3dA\u0005i1-\u001e:t_J\u0014U/\u001b7eKJ,\u0012!\u0015\t\u0005+I#F+\u0003\u0002T-\tIa)\u001e8di&|g.\r\t\u0004+jcV\"\u0001,\u000b\u0005]9&BA\u0006Y\u0015\u0005I\u0016aA8sO&\u00111L\u0016\u0002\u000f\r&tGm\u00142tKJ4\u0018M\u00197f!\tiVN\u0004\u0002_U:\u0011q\f\u001b\b\u0003A\u001et!!\u00194\u000f\u0005\t,W\"A2\u000b\u0005\u0011\u0014\u0012A\u0002\u001fs_>$h(C\u0001Z\u0013\tY\u0001,\u0003\u0002\u0018/&\u0011\u0011NV\u0001\u0005EN|g.\u0003\u0002lY\u00069\u0001/Y2lC\u001e,'BA5W\u0013\tqwN\u0001\u0007Cg>tGi\\2v[\u0016tGO\u0003\u0002lY\u0006q1-\u001e:t_J\u0014U/\u001b7eKJ\u0004\u0013AB2veN|'\u000f\u0006\u0002\u001fg\")q\u0006\u0003a\u0001iB\u0019Q#^<\n\u0005Y4\"AB(qi&|g\u000e\u0005\u0002y\u007f:\u0011\u00110 \b\u0003unl\u0011\u0001\\\u0005\u0003y2\f1bY8om\u0016\u00148/[8og&\u00111N \u0006\u0003y2LA!!\u0001\u0002\u0004\t!!i]8o\u0015\tYg\u0010")
/* loaded from: input_file:akka/contrib/persistence/mongodb/ScalaDriverJournalStream.class */
public class ScalaDriverJournalStream implements JournalStream<Source<Tuple2<Event, Offset>, NotUsed>> {
    public final ScalaMongoDriver akka$contrib$persistence$mongodb$ScalaDriverJournalStream$$driver;
    private final ExecutionContext ec;
    private final Function1<FindObservable<BsonDocument>, FindObservable<BsonDocument>> cursorBuilder;
    private final SharedKillSwitch killSwitch;

    @Override // akka.contrib.persistence.mongodb.JournalStream
    public void stopAllStreams() {
        stopAllStreams();
    }

    @Override // akka.contrib.persistence.mongodb.JournalStream
    public SharedKillSwitch killSwitch() {
        return this.killSwitch;
    }

    @Override // akka.contrib.persistence.mongodb.JournalStream
    public void akka$contrib$persistence$mongodb$JournalStream$_setter_$killSwitch_$eq(SharedKillSwitch sharedKillSwitch) {
        this.killSwitch = sharedKillSwitch;
    }

    public ExecutionContext ec() {
        return this.ec;
    }

    private Function1<FindObservable<BsonDocument>, FindObservable<BsonDocument>> cursorBuilder() {
        return this.cursorBuilder;
    }

    public Source<Tuple2<Event, Offset>, NotUsed> cursor(Option<org.bson.conversions.Bson> option) {
        return this.akka$contrib$persistence$mongodb$ScalaDriverJournalStream$$driver.realtimeEnablePersistence() ? ((Source) Source$.MODULE$.fromFuture((Future) this.akka$contrib$persistence$mongodb$ScalaDriverJournalStream$$driver.realtime(ec())).flatMapConcat(mongoCollection -> {
            return (Source) Source$.MODULE$.fromGraph(new ScalaDriverRealtimeGraphStage(this.akka$contrib$persistence$mongodb$ScalaDriverJournalStream$$driver, ScalaDriverRealtimeGraphStage$.MODULE$.$lessinit$greater$default$2(), option2 -> {
                FindObservable<BsonDocument> mo12apply;
                Tuple2 tuple2 = new Tuple2(option, option2);
                if (tuple2 != null) {
                    Option option2 = (Option) tuple2.mo5777_1();
                    Option option3 = (Option) tuple2.mo5776_2();
                    if (option2 instanceof Some) {
                        org.bson.conversions.Bson bson = (org.bson.conversions.Bson) ((Some) option2).value();
                        if (None$.MODULE$.equals(option3)) {
                            mo12apply = this.cursorBuilder().mo12apply(mongoCollection.find(bson, DefaultHelper$DefaultsTo$.MODULE$.m5530default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                            return mo12apply;
                        }
                    }
                }
                if (tuple2 != null) {
                    Option option4 = (Option) tuple2.mo5777_1();
                    Option option5 = (Option) tuple2.mo5776_2();
                    if (option4 instanceof Some) {
                        org.bson.conversions.Bson bson2 = (org.bson.conversions.Bson) ((Some) option4).value();
                        if (option5 instanceof Some) {
                            mo12apply = this.cursorBuilder().mo12apply(mongoCollection.find(Filters$.MODULE$.and(Predef$.MODULE$.wrapRefArray(new org.bson.conversions.Bson[]{bson2, Filters$.MODULE$.gte("_id", (BsonObjectId) ((Some) option5).value())})), DefaultHelper$DefaultsTo$.MODULE$.m5530default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                            return mo12apply;
                        }
                    }
                }
                if (tuple2 != null) {
                    Option option6 = (Option) tuple2.mo5777_1();
                    Option option7 = (Option) tuple2.mo5776_2();
                    if (None$.MODULE$.equals(option6) && None$.MODULE$.equals(option7)) {
                        mo12apply = this.cursorBuilder().mo12apply(mongoCollection.find(DefaultHelper$DefaultsTo$.MODULE$.m5530default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        return mo12apply;
                    }
                }
                if (tuple2 != null) {
                    Option option8 = (Option) tuple2.mo5777_1();
                    Option option9 = (Option) tuple2.mo5776_2();
                    if (None$.MODULE$.equals(option8) && (option9 instanceof Some)) {
                        mo12apply = this.cursorBuilder().mo12apply(mongoCollection.find(Filters$.MODULE$.gte("_id", (BsonObjectId) ((Some) option9).value()), DefaultHelper$DefaultsTo$.MODULE$.m5530default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        return mo12apply;
                    }
                }
                throw new MatchError(tuple2);
            }).mo3229named("rt-graph-stage").mo3228async()).via(this.killSwitch().flow()).mapConcat(bsonDocument -> {
                ObjectId value = bsonDocument.getObjectId("_id").getValue();
                return (List) Option$.MODULE$.apply(bsonDocument.get(org.eclipse.ditto.signals.events.base.Event.TYPE_QUALIFIER)).filter(bsonValue -> {
                    return BoxesRunTime.boxToBoolean(bsonValue.isArray());
                }).map(bsonValue2 -> {
                    return bsonValue2.asArray();
                }).map(bsonArray -> {
                    return ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(bsonArray.getValues()).asScala()).collect(new ScalaDriverJournalStream$$anonfun$$nestedInanonfun$cursor$6$1(this, value), Buffer$.MODULE$.canBuildFrom())).toList();
                }).getOrElse(() -> {
                    return Nil$.MODULE$;
                });
            });
        })).mo3229named("rt-cursor-source") : Source$.MODULE$.empty();
    }

    public ScalaDriverJournalStream(ScalaMongoDriver scalaMongoDriver, Materializer materializer) {
        this.akka$contrib$persistence$mongodb$ScalaDriverJournalStream$$driver = scalaMongoDriver;
        akka$contrib$persistence$mongodb$JournalStream$_setter_$killSwitch_$eq(KillSwitches$.MODULE$.shared("realtimeKillSwitch"));
        this.ec = scalaMongoDriver.querySideDispatcher();
        this.cursorBuilder = findObservable -> {
            return findObservable.cursorType(CursorType.TailableAwait).maxAwaitTime(new Cpackage.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
        };
    }
}
