package org.rooftop.netx.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisBusyException;
import java.util.Map;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TuplesKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.time.Duration;
import kotlin.time.DurationKt;
import kotlin.time.DurationUnit;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.rooftop.netx.engine.AbstractSagaDispatcher;
import org.rooftop.netx.engine.AbstractSagaListener;
import org.rooftop.netx.engine.core.Saga;
import org.rooftop.netx.engine.logging.LoggingSupportsKt;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.stream.StreamReceiver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* compiled from: RedisStreamSagaListener.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��V\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\b��\u0018�� \u001b2\u00020\u0001:\u0001\u001bBI\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\t\u0012\u0012\u0010\u000b\u001a\u000e\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\r0\f\u0012\u0006\u0010\u000e\u001a\u00020\u000f¢\u0006\u0002\u0010\u0010J\u000e\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\t0\u0018H\u0002J\u001a\u0010\u0019\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\r\u0012\u0004\u0012\u00020\t0\u001a0\u0018H\u0014R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\n\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��R÷\u0001\u0010\u0011\u001aê\u0001\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012`\u0012^\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t \u0013*.\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0018\u00010\u00140\u0014 \u0013*t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012`\u0012^\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t \u0013*.\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0018\u00010\u00140\u0014\u0018\u00010\u00120\u0012X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u000b\u001a\u000e\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\r0\fX\u0082\u0004¢\u0006\u0002\n��R÷\u0001\u0010\u0015\u001aê\u0001\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012`\u0012^\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t \u0013*.\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0018\u00010\u00140\u0014 \u0013*t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012`\u0012^\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t \u0013*.\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \u0013*\u0004\u0018\u00010\t0\t\u0018\u00010\u00140\u0014\u0018\u00010\u00160\u0016X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001c"}, d2 = {"Lorg/rooftop/netx/redis/RedisStreamSagaListener;", "Lorg/rooftop/netx/engine/AbstractSagaListener;", "backpressureSize", "", "sagaDispatcher", "Lorg/rooftop/netx/engine/AbstractSagaDispatcher;", "connectionFactory", "Lorg/springframework/data/redis/connection/ReactiveRedisConnectionFactory;", "nodeGroup", "", "nodeName", "reactiveRedisTemplate", "Lorg/springframework/data/redis/core/ReactiveRedisTemplate;", "Lorg/rooftop/netx/engine/core/Saga;", "objectMapper", "Lcom/fasterxml/jackson/databind/ObjectMapper;", "(ILorg/rooftop/netx/engine/AbstractSagaDispatcher;Lorg/springframework/data/redis/connection/ReactiveRedisConnectionFactory;Ljava/lang/String;Ljava/lang/String;Lorg/springframework/data/redis/core/ReactiveRedisTemplate;Lcom/fasterxml/jackson/databind/ObjectMapper;)V", "options", "Lorg/springframework/data/redis/stream/StreamReceiver$StreamReceiverOptions;", "kotlin.jvm.PlatformType", "Lorg/springframework/data/redis/connection/stream/MapRecord;", "receiver", "Lorg/springframework/data/redis/stream/StreamReceiver;", "createGroupIfNotExists", "Lreactor/core/publisher/Flux;", "receive", "Lkotlin/Pair;", "Companion", "netx"})
/* loaded from: input_file:org/rooftop/netx/redis/RedisStreamSagaListener.class */
public final class RedisStreamSagaListener extends AbstractSagaListener {

    @NotNull
    private static final Companion Companion = new Companion(null);

    @NotNull
    private final String nodeGroup;

    @NotNull
    private final String nodeName;

    @NotNull
    private final ReactiveRedisTemplate<String, Saga> reactiveRedisTemplate;

    @NotNull
    private final ObjectMapper objectMapper;
    private final StreamReceiver.StreamReceiverOptions<String, MapRecord<String, String, String>> options;
    private final StreamReceiver<String, MapRecord<String, String, String>> receiver;

    @NotNull
    private static final String STREAM_KEY = "NETX_STREAM";

    /* compiled from: RedisStreamSagaListener.kt */
    @Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\u000e\n��\b\u0082\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lorg/rooftop/netx/redis/RedisStreamSagaListener$Companion;", "", "()V", "STREAM_KEY", "", "netx"})
    /* loaded from: input_file:org/rooftop/netx/redis/RedisStreamSagaListener$Companion.class */
    private static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public RedisStreamSagaListener(int i, @NotNull AbstractSagaDispatcher abstractSagaDispatcher, @NotNull ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, @NotNull String str, @NotNull String str2, @NotNull ReactiveRedisTemplate<String, Saga> reactiveRedisTemplate, @NotNull ObjectMapper objectMapper) {
        super(i, abstractSagaDispatcher);
        Intrinsics.checkNotNullParameter(abstractSagaDispatcher, "sagaDispatcher");
        Intrinsics.checkNotNullParameter(reactiveRedisConnectionFactory, "connectionFactory");
        Intrinsics.checkNotNullParameter(str, "nodeGroup");
        Intrinsics.checkNotNullParameter(str2, "nodeName");
        Intrinsics.checkNotNullParameter(reactiveRedisTemplate, "reactiveRedisTemplate");
        Intrinsics.checkNotNullParameter(objectMapper, "objectMapper");
        this.nodeGroup = str;
        this.nodeName = str2;
        this.reactiveRedisTemplate = reactiveRedisTemplate;
        this.objectMapper = objectMapper;
        StreamReceiver.StreamReceiverOptionsBuilder builder = StreamReceiver.StreamReceiverOptions.builder();
        Duration.Companion companion = Duration.Companion;
        java.time.Duration ofSeconds = java.time.Duration.ofSeconds(Duration.getInWholeSeconds-impl(DurationKt.toDuration(1, DurationUnit.HOURS)), Duration.getNanosecondsComponent-impl(r2));
        Intrinsics.checkNotNullExpressionValue(ofSeconds, "toComponents-impl(...)");
        this.options = builder.pollTimeout(ofSeconds).build();
        this.receiver = StreamReceiver.create(reactiveRedisConnectionFactory, this.options);
    }

    @Override // org.rooftop.netx.engine.AbstractSagaListener
    @NotNull
    protected Flux<Pair<Saga, String>> receive() {
        Flux<String> createGroupIfNotExists = createGroupIfNotExists();
        Function1<String, Publisher<? extends Pair<? extends Saga, ? extends String>>> function1 = new Function1<String, Publisher<? extends Pair<? extends Saga, ? extends String>>>() { // from class: org.rooftop.netx.redis.RedisStreamSagaListener$receive$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            public final Publisher<? extends Pair<Saga, String>> invoke(String str) {
                StreamReceiver streamReceiver;
                String str2;
                String str3;
                streamReceiver = RedisStreamSagaListener.this.receiver;
                str2 = RedisStreamSagaListener.this.nodeGroup;
                str3 = RedisStreamSagaListener.this.nodeName;
                Flux receive = streamReceiver.receive(Consumer.from(str2, str3), StreamOffset.create("NETX_STREAM", ReadOffset.from(">")));
                final RedisStreamSagaListener redisStreamSagaListener = RedisStreamSagaListener.this;
                Function1<MapRecord<String, String, String>, Pair<? extends Saga, ? extends String>> function12 = new Function1<MapRecord<String, String, String>, Pair<? extends Saga, ? extends String>>() { // from class: org.rooftop.netx.redis.RedisStreamSagaListener$receive$1.1
                    {
                        super(1);
                    }

                    public final Pair<Saga, String> invoke(MapRecord<String, String, String> mapRecord) {
                        ObjectMapper objectMapper;
                        objectMapper = RedisStreamSagaListener.this.objectMapper;
                        return TuplesKt.to(objectMapper.readValue((String) ((Map) mapRecord.getValue()).get("data"), Saga.class), mapRecord.getId().getValue());
                    }
                };
                return receive.map((v1) -> {
                    return invoke$lambda$0(r1, v1);
                });
            }

            private static final Pair invoke$lambda$0(Function1 function12, Object obj) {
                Intrinsics.checkNotNullParameter(function12, "$tmp0");
                return (Pair) function12.invoke(obj);
            }
        };
        Flux<Pair<Saga, String>> flatMap = createGroupIfNotExists.flatMap((v1) -> {
            return receive$lambda$0(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(flatMap, "flatMap(...)");
        return flatMap;
    }

    private final Flux<String> createGroupIfNotExists() {
        Mono createGroup = this.reactiveRedisTemplate.opsForStream().createGroup(STREAM_KEY, ReadOffset.from("0"), this.nodeGroup);
        Intrinsics.checkNotNullExpressionValue(createGroup, "createGroup(...)");
        Mono info = LoggingSupportsKt.info(createGroup, "Redis stream group created key \"NETX_STREAM\" group \"" + this.nodeGroup + "\"");
        RedisStreamSagaListener$createGroupIfNotExists$1 redisStreamSagaListener$createGroupIfNotExists$1 = new Function1<Throwable, Mono<? extends String>>() { // from class: org.rooftop.netx.redis.RedisStreamSagaListener$createGroupIfNotExists$1
            public final Mono<? extends String> invoke(Throwable th) {
                if (th.getCause() instanceof RedisBusyException) {
                    return Mono.just("OK");
                }
                Intrinsics.checkNotNull(th);
                throw th;
            }
        };
        Mono onErrorResume = info.onErrorResume((v1) -> {
            return createGroupIfNotExists$lambda$1(r1, v1);
        });
        RedisStreamSagaListener$createGroupIfNotExists$2 redisStreamSagaListener$createGroupIfNotExists$2 = new Function1<String, Publisher<? extends String>>() { // from class: org.rooftop.netx.redis.RedisStreamSagaListener$createGroupIfNotExists$2
            public final Publisher<? extends String> invoke(String str) {
                return Flux.just(str);
            }
        };
        Flux<String> flatMapMany = onErrorResume.flatMapMany((v1) -> {
            return createGroupIfNotExists$lambda$2(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(flatMapMany, "flatMapMany(...)");
        return flatMapMany;
    }

    private static final Publisher receive$lambda$0(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Mono createGroupIfNotExists$lambda$1(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Mono) function1.invoke(obj);
    }

    private static final Publisher createGroupIfNotExists$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }
}
