/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.redis;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.redis.RedisConnection;
import io.debezium.util.DelayStrategy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;

@Named(value="redis")
@Dependent
public class RedisStreamChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.redis.";
    private static final String PROP_ADDRESS = "debezium.sink.redis.address";
    private static final String PROP_USER = "debezium.sink.redis.user";
    private static final String PROP_PASSWORD = "debezium.sink.redis.password";
    private String address;
    private String user;
    private String password;
    @ConfigProperty(name="debezium.sink.redis.ssl.enabled", defaultValue="false")
    boolean sslEnabled;
    @ConfigProperty(name="debezium.sink.redis.batch.size", defaultValue="500")
    Integer batchSize;
    @ConfigProperty(name="debezium.sink.redis.retry.initial.delay.ms", defaultValue="300")
    Integer initialRetryDelay;
    @ConfigProperty(name="debezium.sink.redis.retry.max.delay.ms", defaultValue="10000")
    Integer maxRetryDelay;
    @ConfigProperty(name="debezium.sink.redis.null.key", defaultValue="default")
    String nullKey;
    @ConfigProperty(name="debezium.sink.redis.null.value", defaultValue="default")
    String nullValue;
    private Jedis client = null;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        this.address = (String)config.getValue(PROP_ADDRESS, String.class);
        this.user = config.getOptionalValue(PROP_USER, String.class).orElse(null);
        this.password = config.getOptionalValue(PROP_PASSWORD, String.class).orElse(null);
        RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.sslEnabled);
        this.client = redisConnection.getRedisClient("debezium:redis:sink");
    }

    @PreDestroy
    void close() {
        try {
            if (this.client != null) {
                this.client.close();
            }
        }
        catch (Exception e) {
            LOGGER.warn("Exception while closing Jedis: {}", (Object)this.client, (Object)e);
        }
        finally {
            this.client = null;
        }
    }

    private <T> Stream<List<T>> batches(List<T> source, int length) {
        if (source.isEmpty()) {
            return Stream.empty();
        }
        int size = source.size();
        int fullChunks = (size - 1) / length;
        return IntStream.range(0, fullChunks + 1).mapToObj(n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        DelayStrategy delayStrategy = DelayStrategy.exponential((long)this.initialRetryDelay.intValue(), (long)this.maxRetryDelay.intValue());
        LOGGER.trace("Handling a batch of {} records", (Object)records.size());
        this.batches(records, this.batchSize).forEach(batch -> {
            boolean completedSuccessfully = false;
            List clonedBatch = batch.stream().collect(Collectors.toList());
            while (!completedSuccessfully) {
                block15: {
                    if (this.client == null) {
                        try {
                            this.connect();
                            continue;
                        }
                        catch (Exception e) {
                            this.close();
                            LOGGER.error("Can't connect to Redis", (Throwable)e);
                            break block15;
                        }
                    }
                    try {
                        LOGGER.trace("Preparing a Redis Pipeline of {} records", (Object)clonedBatch.size());
                        this.client.ping();
                        Pipeline pipeline = this.client.pipelined();
                        for (ChangeEvent record : clonedBatch) {
                            String destination = this.streamNameMapper.map(record.destination());
                            String key = record.key() != null ? this.getString(record.key()) : this.nullKey;
                            String value = record.value() != null ? this.getString(record.value()) : this.nullValue;
                            pipeline.xadd(destination, StreamEntryID.NEW_ENTRY, Collections.singletonMap(key, value));
                        }
                        List responses = pipeline.syncAndReturnAll();
                        ArrayList<ChangeEvent> processedRecords = new ArrayList<ChangeEvent>();
                        int index = 0;
                        int totalOOMResponses = 0;
                        for (Object response : responses) {
                            String message = response.toString();
                            if (message.contains("OOM command not allowed when used memory > 'maxmemory'")) {
                                ++totalOOMResponses;
                            } else {
                                ChangeEvent currentRecord = (ChangeEvent)clonedBatch.get(index);
                                committer.markProcessed((Object)currentRecord);
                                processedRecords.add(currentRecord);
                            }
                            ++index;
                        }
                        clonedBatch.removeAll(processedRecords);
                        if (totalOOMResponses > 0) {
                            LOGGER.warn("Redis runs OOM, {} command(s) failed", (Object)totalOOMResponses);
                        }
                        if (clonedBatch.size() == 0) {
                            completedSuccessfully = true;
                        }
                    }
                    catch (JedisConnectionException jce) {
                        LOGGER.error("Connection error", (Throwable)jce);
                        this.close();
                    }
                    catch (JedisDataException jde) {
                        if (jde.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
                            LOGGER.error("Redis is starting", (Throwable)jde);
                        }
                        LOGGER.error("Unexpected JedisDataException", (Throwable)jde);
                        throw new DebeziumException((Throwable)jde);
                    }
                    catch (Exception e) {
                        LOGGER.error("Unexpected Exception", (Throwable)e);
                        throw new DebeziumException((Throwable)e);
                    }
                }
                delayStrategy.sleepWhen(!completedSuccessfully);
            }
        });
        committer.markBatchFinished();
    }
}

