package io.chrisdavenport.rediculous;

import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Ref$Make$;
import cats.syntax.OptionIdOps$;
import fs2.Chunk;
import fs2.Stream;
import fs2.Stream$;
import fs2.compat.NotGiven$;
import io.chrisdavenport.rediculous.RedisCommands;
import io.chrisdavenport.rediculous.RedisStream;
import io.chrisdavenport.rediculous.RespRaw;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxesRunTime;

/* compiled from: RedisStream.scala */
/* loaded from: input_file:io/chrisdavenport/rediculous/RedisStream$.class */
public final class RedisStream$ {
    public static RedisStream$ MODULE$;

    static {
        new RedisStream$();
    }

    public <F> RedisStream<F> fromConnection(final RedisConnection<F> redisConnection, final GenConcurrent<F, Throwable> genConcurrent) {
        return new RedisStream<F>(redisConnection, genConcurrent) { // from class: io.chrisdavenport.rediculous.RedisStream$$anon$1
            private final Function1<String, Function1<RedisCommands.StreamsRecord, RedisCommands.StreamOffset>> nextOffset = str -> {
                return streamsRecord -> {
                    return new RedisCommands.StreamOffset.From(str, streamsRecord.recordId());
                };
            };
            private final Function1<List<RedisCommands.StreamsRecord>, Map<String, Option<RedisCommands.StreamOffset>>> offsetsByKey = list -> {
                return (Map) list.groupBy(streamsRecord -> {
                    return streamsRecord.recordId();
                }).map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    String str = (String) tuple2._1();
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), ((List) tuple2._2()).lastOption().map((Function1) this.nextOffset().apply(str)));
                }, Map$.MODULE$.canBuildFrom());
            };
            private final RedisConnection connection$1;
            private final GenConcurrent evidence$1$1;

            @Override // io.chrisdavenport.rediculous.RedisStream
            public Function1<String, RedisCommands.StreamOffset> read$default$2() {
                Function1<String, RedisCommands.StreamOffset> read$default$2;
                read$default$2 = read$default$2();
                return read$default$2;
            }

            @Override // io.chrisdavenport.rediculous.RedisStream
            public Duration read$default$3() {
                Duration read$default$3;
                read$default$3 = read$default$3();
                return read$default$3;
            }

            @Override // io.chrisdavenport.rediculous.RedisStream
            public Option<Object> read$default$4() {
                Option<Object> read$default$4;
                read$default$4 = read$default$4();
                return read$default$4;
            }

            @Override // io.chrisdavenport.rediculous.RedisStream
            public F append(Chunk<RedisStream.XAddMessage> chunk) {
                return (F) ((RespRaw.RawPipeline) chunk.traverse(xAddMessage -> {
                    return (RespRaw.RawPipeline) RedisCommands$.MODULE$.xadd(xAddMessage.stream(), xAddMessage.body(), (RedisCommands.XAddOpts) xAddMessage.approxMaxlen().map(obj -> {
                        return $anonfun$append$2(BoxesRunTime.unboxToLong(obj));
                    }).getOrElse(() -> {
                        return RedisCommands$XAddOpts$.MODULE$.m36default();
                    }), RespRaw$RawPipeline$.MODULE$.ctx());
                }, RespRaw$RawPipeline$.MODULE$.applicative())).pipeline(this.connection$1, this.evidence$1$1);
            }

            private Function1<String, Function1<RedisCommands.StreamsRecord, RedisCommands.StreamOffset>> nextOffset() {
                return this.nextOffset;
            }

            private Function1<List<RedisCommands.StreamsRecord>, Map<String, Option<RedisCommands.StreamOffset>>> offsetsByKey() {
                return this.offsetsByKey;
            }

            @Override // io.chrisdavenport.rediculous.RedisStream
            public Stream<F, RedisCommands.XReadResponse> read(Set<String> set, Function1<String, RedisCommands.StreamOffset> function1, Duration duration, Option<Object> option) {
                Map map = ((TraversableOnce) set.map(str -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), function1.apply(str));
                }, Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
                RedisCommands.XReadOpts copy = RedisCommands$XReadOpts$.MODULE$.m38default().copy(OptionIdOps$.MODULE$.some$extension(cats.implicits$.MODULE$.catsSyntaxOptionId(BoxesRunTime.boxToLong(duration.toMillis()))), option, RedisCommands$XReadOpts$.MODULE$.m38default().copy$default$3());
                return Stream$.MODULE$.eval(cats.effect.package$.MODULE$.Ref().of(map, Ref$Make$.MODULE$.concurrentInstance(this.evidence$1$1))).flatMap(ref -> {
                    return Stream$.MODULE$.eval(ref.get()).flatMap(map2 -> {
                        return ((Stream) cats.implicits$.MODULE$.toFunctorFilterOps(Stream$.MODULE$.eval(((Redis) RedisCommands$.MODULE$.xread(map2.values().toSet(), copy, RedisCtx$.MODULE$.redis(this.evidence$1$1))).run(this.connection$1, this.evidence$1$1)), Stream$.MODULE$.functorFilterInstance()).flattenOption(Predef$.MODULE$.$conforms())).flatMap(list -> {
                            return Stream$.MODULE$.eval(cats.implicits$.MODULE$.toTraverseOps(((TraversableOnce) ((TraversableLike) this.offsetsByKey().apply(list.flatMap(xReadResponse -> {
                                return xReadResponse.records();
                            }, List$.MODULE$.canBuildFrom()))).collect(new RedisStream$$anon$1$$anonfun$1(null), Map$.MODULE$.canBuildFrom())).toList().map(tuple2 -> {
                                if (tuple2 == null) {
                                    throw new MatchError(tuple2);
                                }
                                String str2 = (String) tuple2._1();
                                RedisCommands.StreamOffset streamOffset = (RedisCommands.StreamOffset) tuple2._2();
                                return ref.update(map2 -> {
                                    return map2.updated(str2, streamOffset);
                                });
                            }, List$.MODULE$.canBuildFrom()), cats.implicits$.MODULE$.catsStdInstancesForList()).sequence(Predef$.MODULE$.$conforms(), this.evidence$1$1)).flatMap(list -> {
                                return Stream$.MODULE$.emits(list);
                            }, NotGiven$.MODULE$.default());
                        }, NotGiven$.MODULE$.default());
                    }, NotGiven$.MODULE$.default()).repeat();
                }, NotGiven$.MODULE$.default());
            }

            public static final /* synthetic */ RedisCommands.XAddOpts $anonfun$append$2(long j) {
                Option<Object> some$extension = OptionIdOps$.MODULE$.some$extension(cats.implicits$.MODULE$.catsSyntaxOptionId(BoxesRunTime.boxToLong(j)));
                Option<RedisCommands.Trimming> some$extension2 = OptionIdOps$.MODULE$.some$extension(cats.implicits$.MODULE$.catsSyntaxOptionId(RedisCommands$Trimming$Approximate$.MODULE$));
                return RedisCommands$XAddOpts$.MODULE$.m36default().copy(RedisCommands$XAddOpts$.MODULE$.m36default().copy$default$1(), some$extension, some$extension2, RedisCommands$XAddOpts$.MODULE$.m36default().copy$default$4(), RedisCommands$XAddOpts$.MODULE$.m36default().copy$default$5(), RedisCommands$XAddOpts$.MODULE$.m36default().copy$default$6());
            }

            {
                this.connection$1 = redisConnection;
                this.evidence$1$1 = genConcurrent;
            }
        };
    }

    private RedisStream$() {
        MODULE$ = this;
    }
}
