package org.springframework.data.redis.core;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import org.hibernate.validator.internal.metadata.core.ConstraintHelper;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.6.3.jar:org/springframework/data/redis/core/DefaultReactiveStreamOperations.class */
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
    private final ReactiveRedisTemplate<?, ?> template;
    private final RedisSerializationContext<K, ?> serializationContext;
    private final StreamObjectMapper objectMapper;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultReactiveStreamOperations(ReactiveRedisTemplate<?, ?> reactiveRedisTemplate, final RedisSerializationContext<K, ?> redisSerializationContext, @Nullable HashMapper<? super K, ? super HK, ? super HV> hashMapper) {
        this.template = reactiveRedisTemplate;
        this.serializationContext = redisSerializationContext;
        this.objectMapper = new StreamObjectMapper(hashMapper) { // from class: org.springframework.data.redis.core.DefaultReactiveStreamOperations.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.springframework.data.redis.core.StreamObjectMapper
            public HashMapper<?, ?, ?> doGetHashMapper(final ConversionService conversionService, final Class<?> cls) {
                return (DefaultReactiveStreamOperations.this.objectMapper.isSimpleType(cls) || ClassUtils.isAssignable(ByteBuffer.class, cls)) ? new HashMapper<Object, Object, Object>() { // from class: org.springframework.data.redis.core.DefaultReactiveStreamOperations.1.1
                    @Override // org.springframework.data.redis.hash.HashMapper
                    public Map<Object, Object> toHash(Object obj) {
                        Object obj2 = ConstraintHelper.PAYLOAD;
                        Object obj3 = obj;
                        if (redisSerializationContext.getHashKeySerializationPair() == null) {
                            obj2 = obj2.toString().getBytes(StandardCharsets.UTF_8);
                        }
                        if (redisSerializationContext.getHashValueSerializationPair() == null) {
                            obj3 = conversionService.convert(obj3, byte[].class);
                        }
                        return Collections.singletonMap(obj2, obj3);
                    }

                    @Override // org.springframework.data.redis.hash.HashMapper
                    public Object fromHash(Map<Object, Object> map) {
                        Object next = map.values().iterator().next();
                        if (ClassUtils.isAssignableValue(cls, next)) {
                            return next;
                        }
                        Object deserializeHashValue = DefaultReactiveStreamOperations.this.deserializeHashValue((ByteBuffer) next);
                        return ClassUtils.isAssignableValue(cls, deserializeHashValue) ? next : conversionService.convert(deserializeHashValue, cls);
                    }
                } : super.doGetHashMapper(conversionService, cls);
            }
        };
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<Long> acknowledge(K k, String str, RecordId... recordIdArr) {
        Assert.notNull(k, "Key must not be null!");
        Assert.hasText(str, "Group must not be null or empty!");
        Assert.notNull(recordIdArr, "MessageIds must not be null!");
        Assert.notEmpty(recordIdArr, "MessageIds must not be empty!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xAck(rawKey(k), str, recordIdArr);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<RecordId> add(Record<K, ?> record) {
        Assert.notNull(record.getStream(), "Key must not be null!");
        Assert.notNull(record.getValue(), "Body must not be null!");
        MapRecord mapRecord = StreamObjectMapper.toMapRecord(this, record);
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xAdd(serializeRecord(mapRecord));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<Long> delete(K k, RecordId... recordIdArr) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(recordIdArr, "MessageIds must not be null!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xDel(rawKey(k), recordIdArr);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<String> createGroup(K k, ReadOffset readOffset, String str) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(readOffset, "ReadOffset must not be null!");
        Assert.notNull(str, "Group must not be null!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xGroupCreate(rawKey(k), str, readOffset, true);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<String> deleteConsumer(K k, Consumer consumer) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(consumer, "Consumer must not be null!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xGroupDelConsumer(rawKey(k), consumer);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<String> destroyGroup(K k, String str) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(str, "Group must not be null!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xGroupDestroy(rawKey(k), str);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Flux<StreamInfo.XInfoConsumer> consumers(K k, String str) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(str, "Group must not be null!");
        return createFlux(reactiveStreamCommands -> {
            return reactiveStreamCommands.xInfoConsumers(rawKey(k), str);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<StreamInfo.XInfoStream> info(K k) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xInfo(rawKey(k));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Flux<StreamInfo.XInfoGroup> groups(K k) {
        Assert.notNull(k, "Key must not be null!");
        return createFlux(reactiveStreamCommands -> {
            return reactiveStreamCommands.xInfoGroups(rawKey(k));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<PendingMessages> pending(K k, String str, Range<?> range, long j) {
        ByteBuffer rawKey = rawKey(k);
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xPending(rawKey, str, (Range<?>) range, Long.valueOf(j));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<PendingMessages> pending(K k, Consumer consumer, Range<?> range, long j) {
        ByteBuffer rawKey = rawKey(k);
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xPending(rawKey, consumer, (Range<?>) range, Long.valueOf(j));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<PendingMessagesSummary> pending(K k, String str) {
        ByteBuffer rawKey = rawKey(k);
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xPending(rawKey, str);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<Long> size(K k) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xLen(rawKey(k));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Flux<MapRecord<K, HK, HV>> range(K k, Range<String> range, RedisZSetCommands.Limit limit) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(range, "Range must not be null!");
        Assert.notNull(limit, "Limit must not be null!");
        return (Flux<MapRecord<K, HK, HV>>) createFlux(reactiveStreamCommands -> {
            return reactiveStreamCommands.xRange(rawKey(k), range, limit).map(this::deserializeRecord);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Flux<MapRecord<K, HK, HV>> read(StreamReadOptions streamReadOptions, StreamOffset<K>... streamOffsetArr) {
        Assert.notNull(streamReadOptions, "StreamReadOptions must not be null!");
        Assert.notNull(streamOffsetArr, "Streams must not be null!");
        return (Flux<MapRecord<K, HK, HV>>) createFlux(reactiveStreamCommands -> {
            return reactiveStreamCommands.xRead(streamReadOptions, rawStreamOffsets(streamOffsetArr)).map(this::deserializeRecord);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Flux<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions streamReadOptions, StreamOffset<K>... streamOffsetArr) {
        Assert.notNull(consumer, "Consumer must not be null!");
        Assert.notNull(streamReadOptions, "StreamReadOptions must not be null!");
        Assert.notNull(streamOffsetArr, "Streams must not be null!");
        return (Flux<MapRecord<K, HK, HV>>) createFlux(reactiveStreamCommands -> {
            return reactiveStreamCommands.xReadGroup(consumer, streamReadOptions, rawStreamOffsets(streamOffsetArr)).map(this::deserializeRecord);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Flux<MapRecord<K, HK, HV>> reverseRange(K k, Range<String> range, RedisZSetCommands.Limit limit) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(range, "Range must not be null!");
        Assert.notNull(limit, "Limit must not be null!");
        return (Flux<MapRecord<K, HK, HV>>) createFlux(reactiveStreamCommands -> {
            return reactiveStreamCommands.xRevRange(rawKey(k), range, limit).map(this::deserializeRecord);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<Long> trim(K k, long j) {
        return trim(k, j, false);
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public Mono<Long> trim(K k, long j, boolean z) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveStreamCommands -> {
            return reactiveStreamCommands.xTrim(rawKey(k), j, z);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations, org.springframework.data.redis.core.HashMapperProvider
    public <V> HashMapper<V, HK, HV> getHashMapper(Class<V> cls) {
        return this.objectMapper.getHashMapper(cls);
    }

    private StreamOffset<ByteBuffer>[] rawStreamOffsets(StreamOffset<K>[] streamOffsetArr) {
        return (StreamOffset[]) Arrays.stream(streamOffsetArr).map(streamOffset -> {
            return StreamOffset.create(rawKey(streamOffset.getKey()), streamOffset.getOffset());
        }).toArray(i -> {
            return new StreamOffset[i];
        });
    }

    private <T> Mono<T> createMono(Function<ReactiveStreamCommands, Publisher<T>> function) {
        Assert.notNull(function, "Function must not be null!");
        return this.template.doCreateMono(reactiveRedisConnection -> {
            return (Publisher) function.apply(reactiveRedisConnection.streamCommands());
        });
    }

    private <T> Flux<T> createFlux(Function<ReactiveStreamCommands, Publisher<T>> function) {
        Assert.notNull(function, "Function must not be null!");
        return this.template.doCreateFlux(reactiveRedisConnection -> {
            return (Publisher) function.apply(reactiveRedisConnection.streamCommands());
        });
    }

    private ByteBuffer rawKey(K k) {
        return this.serializationContext.getKeySerializationPair().write(k);
    }

    private ByteBuffer rawHashKey(HK hk) {
        try {
            return this.serializationContext.getHashKeySerializationPair().write(hk);
        } catch (IllegalStateException e) {
            return ByteBuffer.wrap((byte[]) this.objectMapper.getConversionService().convert(hk, byte[].class));
        }
    }

    private ByteBuffer rawValue(HV hv) {
        try {
            return this.serializationContext.getHashValueSerializationPair().write(hv);
        } catch (IllegalStateException e) {
            return ByteBuffer.wrap((byte[]) this.objectMapper.getConversionService().convert(hv, byte[].class));
        }
    }

    private HK readHashKey(ByteBuffer byteBuffer) {
        return this.serializationContext.getHashKeySerializationPair().getReader().read(byteBuffer);
    }

    private K readKey(ByteBuffer byteBuffer) {
        return this.serializationContext.getKeySerializationPair().read(byteBuffer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HV deserializeHashValue(ByteBuffer byteBuffer) {
        return this.serializationContext.getHashValueSerializationPair().read(byteBuffer);
    }

    @Override // org.springframework.data.redis.core.ReactiveStreamOperations
    public MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord byteBufferRecord) {
        return (MapRecord<K, HK, HV>) byteBufferRecord.map(mapRecord -> {
            return mapRecord.mapEntries(this::deserializeRecordFields).withStreamKey((MapRecord) readKey(byteBufferRecord.getStream()));
        });
    }

    private Map.Entry<HK, HV> deserializeRecordFields(Map.Entry<ByteBuffer, ByteBuffer> entry) {
        return Converters.entryOf(readHashKey(entry.getKey()), deserializeHashValue(entry.getValue()));
    }

    private ByteBufferRecord serializeRecord(MapRecord<K, ? extends HK, ? extends HV> mapRecord) {
        return ByteBufferRecord.of((MapRecord<ByteBuffer, ByteBuffer, ByteBuffer>) mapRecord.map(mapRecord2 -> {
            return mapRecord2.mapEntries(this::serializeRecordFields).withStreamKey((MapRecord) rawKey(mapRecord.getStream()));
        }));
    }

    private Map.Entry<ByteBuffer, ByteBuffer> serializeRecordFields(Map.Entry<? extends HK, ? extends HV> entry) {
        return Converters.entryOf(rawHashKey(entry.getKey()), rawValue(entry.getValue()));
    }
}
