/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.messaging.redis.spring.consumer;

import io.eventuate.messaging.redis.spring.consumer.RedisMessage;
import io.eventuate.messaging.redis.spring.consumer.RedisMessageHandler;
import io.lettuce.core.RedisCommandExecutionException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;

public class ChannelProcessor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private String subscriptionIdentificationInfo;
    private CountDownLatch stopCountDownLatch = new CountDownLatch(1);
    private AtomicBoolean running = new AtomicBoolean(false);
    private String subscriberId;
    private String channel;
    private RedisMessageHandler messageHandler;
    private RedisTemplate<String, String> redisTemplate;
    private long timeInMillisecondsToSleepWhenKeyDoesNotExist;
    private long blockStreamTimeInMilliseconds;

    public ChannelProcessor(RedisTemplate<String, String> redisTemplate, String subscriberId, String channel, RedisMessageHandler messageHandler, String subscriptionIdentificationInfo, long timeInMillisecondsToSleepWhenKeyDoesNotExist, long blockStreamTimeInMilliseconds) {
        this.redisTemplate = redisTemplate;
        this.subscriberId = subscriberId;
        this.channel = channel;
        this.messageHandler = messageHandler;
        this.subscriptionIdentificationInfo = subscriptionIdentificationInfo;
        this.timeInMillisecondsToSleepWhenKeyDoesNotExist = timeInMillisecondsToSleepWhenKeyDoesNotExist;
        this.blockStreamTimeInMilliseconds = blockStreamTimeInMilliseconds;
        this.logger.info("Channel processor is created (channel = {}, {})", (Object)channel, (Object)subscriptionIdentificationInfo);
    }

    public void process() {
        try {
            this.logger.info("Channel processor started processing (channel = {}, {})", (Object)this.channel, (Object)this.subscriptionIdentificationInfo);
            this.running.set(true);
            this.makeSureConsumerGroupExists();
            this.processPendingRecords();
            this.processRegularRecords();
            this.stopCountDownLatch.countDown();
            this.logger.info("Channel processor finished processing (channel = {}, {})", (Object)this.channel, (Object)this.subscriptionIdentificationInfo);
        }
        catch (Throwable e) {
            this.logger.error("Channel processor got error: (channel = {}, subscriberId = {})", (Object)this.channel, (Object)this.subscriberId);
            this.logger.error("Channel processor got error", e);
            throw e;
        }
    }

    public void stop() {
        this.logger.info("stopping channel (channel = {}, {})", (Object)this.channel, (Object)this.subscriptionIdentificationInfo);
        this.running.set(false);
        try {
            this.stopCountDownLatch.await();
            this.logger.info("Stopped channel (channel = {}, {})", (Object)this.channel, (Object)this.subscriptionIdentificationInfo);
        }
        catch (InterruptedException e) {
            this.logger.error("Stopping channel failed (channel = {}, {})", (Object)this.channel, (Object)this.subscriptionIdentificationInfo);
            this.logger.error("Stopping channel failed", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void makeSureConsumerGroupExists() {
        this.logger.info("Ensuring consumer group exists {} {}", (Object)this.channel, (Object)this.subscriberId);
        while (this.running.get()) {
            try {
                this.logger.info("Creating group {} {}", (Object)this.channel, (Object)this.subscriberId);
                this.redisTemplate.opsForStream().createGroup((Object)this.channel, ReadOffset.from((String)"0"), this.subscriberId);
                this.logger.info("Ensured consumer group exists {}", (Object)this.channel);
                return;
            }
            catch (RedisSystemException e) {
                if (this.isKeyDoesNotExist(e)) {
                    this.logger.info("Stream {} does not exist!", (Object)this.channel);
                    this.sleep(this.timeInMillisecondsToSleepWhenKeyDoesNotExist);
                    continue;
                }
                if (this.isGroupExistsAlready(e)) {
                    this.logger.info("Ensured consumer group exists {}", (Object)this.channel);
                    return;
                }
                this.logger.error("Got exception ensuring consumer group exists: " + this.channel, (Throwable)e);
                throw e;
            }
        }
    }

    private boolean isKeyDoesNotExist(RedisSystemException e) {
        return this.isRedisCommandExceptionContainingMessage(e, "ERR The XGROUP subcommand requires the key to exist");
    }

    private boolean isGroupExistsAlready(RedisSystemException e) {
        return this.isRedisCommandExceptionContainingMessage(e, "Consumer Group name already exists");
    }

    private boolean isRedisCommandExceptionContainingMessage(RedisSystemException e, String expectedMessage) {
        String message = e.getCause().getMessage();
        return e.getCause() instanceof RedisCommandExecutionException && message != null && message.contains(expectedMessage);
    }

    private void processPendingRecords() {
        this.logger.info("Processing pending records {}", (Object)this.channel);
        while (this.running.get()) {
            List<MapRecord<String, Object, Object>> pendingRecords = this.getPendingRecords();
            if (pendingRecords.isEmpty()) {
                return;
            }
            this.processRecords(pendingRecords);
        }
        this.logger.info("Processing pending records finished {}", (Object)this.channel);
    }

    private void processRegularRecords() {
        this.logger.trace("Processing regular records {}", (Object)this.channel);
        while (this.running.get()) {
            this.processRecords(this.getUnprocessedRecords());
        }
        this.logger.trace("Processing regular records finished {}", (Object)this.channel);
    }

    private void processRecords(List<MapRecord<String, Object, Object>> records) {
        records.forEach(entries -> ((Map)entries.getValue()).values().forEach(v -> this.processMessage(v.toString(), entries.getId())));
    }

    private void processMessage(String message, RecordId recordId) {
        this.logger.trace("Channel processor {} with channel {} got message: {}", new Object[]{this.subscriptionIdentificationInfo, this.channel, message});
        try {
            this.logger.trace("Invoked message handler");
            this.messageHandler.accept(new RedisMessage(message));
            this.logger.trace("Message handler invoked");
        }
        catch (Throwable t) {
            this.logger.error("Message processing failed", t);
            this.stopCountDownLatch.countDown();
            throw t;
        }
        this.redisTemplate.opsForStream().acknowledge((Object)this.channel, this.subscriberId, new RecordId[]{recordId});
    }

    private List<MapRecord<String, Object, Object>> getPendingRecords() {
        return this.getRecords(ReadOffset.from((String)"0"), StreamReadOptions.empty());
    }

    private List<MapRecord<String, Object, Object>> getUnprocessedRecords() {
        return this.getRecords(ReadOffset.from((String)">"), StreamReadOptions.empty().block(Duration.ofMillis(this.blockStreamTimeInMilliseconds)));
    }

    private List<MapRecord<String, Object, Object>> getRecords(ReadOffset readOffset, StreamReadOptions options) {
        List records = this.redisTemplate.opsForStream().read(Consumer.from((String)this.subscriberId, (String)this.subscriberId), options, new StreamOffset[]{StreamOffset.create((Object)this.channel, (ReadOffset)readOffset)});
        if (records.size() > 0) {
            this.logger.trace("getRecords {} {} found {} records", new Object[]{this.channel, readOffset, records.size()});
        }
        return records;
    }

    private void sleep(long timeoutInMilliseconds) {
        try {
            Thread.sleep(timeoutInMilliseconds);
        }
        catch (Exception e) {
            this.logger.error("Sleeping failed", (Throwable)e);
            throw new RuntimeException(e);
        }
    }
}

