package org.springframework.data.redis.stream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.6.3.jar:org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.class */
class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
    private final Executor taskExecutor;
    private final ErrorHandler errorHandler;
    private final StreamReadOptions readOptions;
    private final RedisTemplate<K, ?> template;
    private final StreamOperations<K, Object, Object> streamOperations;
    private final StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> containerOptions;
    private final Object lifecycleMonitor = new Object();
    private final List<Subscription> subscriptions = new ArrayList();
    private boolean running = false;

    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.6.3.jar:org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer$LoggingErrorHandler.class */
    enum LoggingErrorHandler implements ErrorHandler {
        INSTANCE;

        private final Log logger = LogFactory.getLog((Class<?>) LoggingErrorHandler.class);

        LoggingErrorHandler() {
        }

        @Override // org.springframework.util.ErrorHandler
        public void handleError(Throwable th) {
            if (this.logger.isErrorEnabled()) {
                this.logger.error("Unexpected error occurred in scheduled task.", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.6.3.jar:org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer$TaskSubscription.class */
    public static class TaskSubscription implements Subscription {
        private final Task task;

        protected TaskSubscription(Task task) {
            this.task = task;
        }

        Task getTask() {
            return this.task;
        }

        @Override // org.springframework.data.redis.stream.Subscription
        public boolean isActive() {
            return this.task.isActive();
        }

        @Override // org.springframework.data.redis.stream.Subscription
        public boolean await(Duration duration) throws InterruptedException {
            return this.task.awaitStart(duration);
        }

        @Override // org.springframework.data.redis.stream.Cancelable
        public void cancel() throws DataAccessResourceFailureException {
            this.task.cancel();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return ObjectUtils.nullSafeEquals(this.task, ((TaskSubscription) obj).task);
        }

        public int hashCode() {
            return ObjectUtils.nullSafeHashCode(this.task);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultStreamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> streamMessageListenerContainerOptions) {
        Assert.notNull(redisConnectionFactory, "RedisConnectionFactory must not be null!");
        Assert.notNull(streamMessageListenerContainerOptions, "StreamMessageListenerContainerOptions must not be null!");
        this.taskExecutor = streamMessageListenerContainerOptions.getExecutor();
        this.errorHandler = streamMessageListenerContainerOptions.getErrorHandler();
        this.readOptions = getStreamReadOptions(streamMessageListenerContainerOptions);
        this.template = createRedisTemplate(redisConnectionFactory, streamMessageListenerContainerOptions);
        this.containerOptions = streamMessageListenerContainerOptions;
        if (streamMessageListenerContainerOptions.hasHashMapper()) {
            this.streamOperations = this.template.opsForStream(streamMessageListenerContainerOptions.getRequiredHashMapper());
        } else {
            this.streamOperations = this.template.opsForStream();
        }
    }

    private static StreamReadOptions getStreamReadOptions(StreamMessageListenerContainer.StreamMessageListenerContainerOptions<?, ?> streamMessageListenerContainerOptions) {
        StreamReadOptions empty = StreamReadOptions.empty();
        if (streamMessageListenerContainerOptions.getBatchSize().isPresent()) {
            empty = empty.count(streamMessageListenerContainerOptions.getBatchSize().getAsInt());
        }
        if (!streamMessageListenerContainerOptions.getPollTimeout().isZero()) {
            empty = empty.block(streamMessageListenerContainerOptions.getPollTimeout());
        }
        return empty;
    }

    private RedisTemplate<K, V> createRedisTemplate(RedisConnectionFactory redisConnectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> streamMessageListenerContainerOptions) {
        RedisTemplate<K, V> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(streamMessageListenerContainerOptions.getKeySerializer());
        redisTemplate.setValueSerializer(streamMessageListenerContainerOptions.getKeySerializer());
        redisTemplate.setHashKeySerializer(streamMessageListenerContainerOptions.getHashKeySerializer());
        redisTemplate.setHashValueSerializer(streamMessageListenerContainerOptions.getHashValueSerializer());
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return false;
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        synchronized (this.lifecycleMonitor) {
            if (this.running) {
                return;
            }
            Stream<Subscription> filter = this.subscriptions.stream().filter(subscription -> {
                return !subscription.isActive();
            }).filter(subscription2 -> {
                return subscription2 instanceof TaskSubscription;
            });
            Class<TaskSubscription> cls = TaskSubscription.class;
            TaskSubscription.class.getClass();
            Stream map = filter.map((v1) -> {
                return r1.cast(v1);
            }).map((v0) -> {
                return v0.getTask();
            });
            Executor executor = this.taskExecutor;
            executor.getClass();
            map.forEach((v1) -> {
                r1.execute(v1);
            });
            this.running = true;
        }
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        synchronized (this.lifecycleMonitor) {
            if (this.running) {
                this.subscriptions.forEach((v0) -> {
                    v0.cancel();
                });
                this.running = false;
            }
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        boolean z;
        synchronized (this.lifecycleMonitor) {
            z = this.running;
        }
        return z;
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    @Override // org.springframework.data.redis.stream.StreamMessageListenerContainer
    public Subscription register(StreamMessageListenerContainer.StreamReadRequest<K> streamReadRequest, StreamListener<K, V> streamListener) {
        return doRegister(getReadTask(streamReadRequest, streamListener));
    }

    private StreamPollTask<K, V> getReadTask(StreamMessageListenerContainer.StreamReadRequest<K> streamReadRequest, StreamListener<K, V> streamListener) {
        return new StreamPollTask<>(streamReadRequest, streamListener, this.errorHandler, TypeDescriptor.valueOf(this.containerOptions.hasHashMapper() ? this.containerOptions.getTargetType() : MapRecord.class), getReadFunction(streamReadRequest), getDeserializer());
    }

    private Function<ByteRecord, V> getDeserializer() {
        StreamOperations<K, Object, Object> streamOperations = this.streamOperations;
        streamOperations.getClass();
        Function<ByteRecord, V> function = streamOperations::deserializeRecord;
        return this.containerOptions.getHashMapper() == null ? function : byteRecord -> {
            return this.streamOperations.map((MapRecord<K, Object, Object>) function.apply(byteRecord), this.containerOptions.getTargetType());
        };
    }

    private Function<ReadOffset, List<ByteRecord>> getReadFunction(StreamMessageListenerContainer.StreamReadRequest<K> streamReadRequest) {
        byte[] serialize = this.template.getKeySerializer().serialize(streamReadRequest.getStreamOffset().getKey());
        if (!(streamReadRequest instanceof StreamMessageListenerContainer.ConsumerStreamReadRequest)) {
            return readOffset -> {
                return (List) this.template.execute(redisConnection -> {
                    return redisConnection.streamCommands().xRead(this.readOptions, StreamOffset.create(serialize, readOffset));
                });
            };
        }
        StreamMessageListenerContainer.ConsumerStreamReadRequest consumerStreamReadRequest = (StreamMessageListenerContainer.ConsumerStreamReadRequest) streamReadRequest;
        StreamReadOptions autoAcknowledge = consumerStreamReadRequest.isAutoAcknowledge() ? this.readOptions.autoAcknowledge() : this.readOptions;
        Consumer consumer = consumerStreamReadRequest.getConsumer();
        return readOffset2 -> {
            return (List) this.template.execute(redisConnection -> {
                return redisConnection.streamCommands().xReadGroup(consumer, autoAcknowledge, StreamOffset.create(serialize, readOffset2));
            });
        };
    }

    private Subscription doRegister(Task task) {
        TaskSubscription taskSubscription = new TaskSubscription(task);
        synchronized (this.lifecycleMonitor) {
            this.subscriptions.add(taskSubscription);
            if (this.running) {
                this.taskExecutor.execute(task);
            }
        }
        return taskSubscription;
    }

    @Override // org.springframework.data.redis.stream.StreamMessageListenerContainer
    public void remove(Subscription subscription) {
        synchronized (this.lifecycleMonitor) {
            if (this.subscriptions.contains(subscription)) {
                if (subscription.isActive()) {
                    subscription.cancel();
                }
                this.subscriptions.remove(subscription);
            }
        }
    }
}
