/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.redis;

import io.debezium.server.redis.RedisStreamChangeConsumerConfig;
import io.debezium.storage.redis.RedisClient;
import io.debezium.util.IoUtil;
import io.smallrye.mutiny.tuples.Tuple2;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisMemoryThreshold {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisMemoryThreshold.class);
    private static final String INFO_MEMORY = "memory";
    private static final String INFO_MEMORY_SECTION_MAXMEMORY = "maxmemory";
    private static final String INFO_MEMORY_SECTION_USEDMEMORY = "used_memory";
    private static long accumulatedMemory = 0L;
    private static long previouslyUsedMemory = 0L;
    private static long totalProcessed = 0L;
    private RedisClient client;
    private long memoryLimit = 0L;
    private long maximumMemory = 0L;

    public RedisMemoryThreshold(RedisClient client, RedisStreamChangeConsumerConfig config) {
        this.client = client;
        this.memoryLimit = 0x100000L * (long)config.getMemoryLimitMb();
    }

    public void setRedisClient(RedisClient client) {
        this.client = client;
    }

    public boolean checkMemory(long extraMemory, int bufferSize, int bufferFillRate) {
        Tuple2<Long, Long> memoryTuple = this.memoryTuple();
        if (totalProcessed + (long)bufferSize >= Long.MAX_VALUE) {
            LOGGER.warn("Resetting the total processed records counter as it has reached its maximum value: {}", (Object)totalProcessed);
            totalProcessed = 0L;
        }
        this.maximumMemory = (Long)memoryTuple.getItem2();
        if (this.maximumMemory == 0L) {
            LOGGER.debug("Total Processed Records: {}", (Object)(totalProcessed += (long)bufferSize));
            return true;
        }
        long extimatedBatchSize = extraMemory * (long)bufferFillRate;
        long usedMemory = (Long)memoryTuple.getItem1();
        long prevAccumulatedMemory = accumulatedMemory;
        long diff = usedMemory - previouslyUsedMemory;
        if (diff == 0L) {
            accumulatedMemory += extraMemory * (long)bufferSize;
        } else {
            previouslyUsedMemory = usedMemory;
            accumulatedMemory = extraMemory * (long)bufferSize;
        }
        long estimatedUsedMemory = usedMemory + accumulatedMemory + extimatedBatchSize;
        if (estimatedUsedMemory >= this.maximumMemory) {
            LOGGER.info("Sink memory threshold percentage was reached. Will retry; (estimated used memory size: {}, maxmemory: {}). Total Processed Records: {}", new Object[]{RedisMemoryThreshold.getSizeInHumanReadableFormat(estimatedUsedMemory), RedisMemoryThreshold.getSizeInHumanReadableFormat(this.maximumMemory), totalProcessed});
            accumulatedMemory = prevAccumulatedMemory;
            return false;
        }
        LOGGER.debug("Maximum reached: {}; Used Mem {}; Max Mem: {}; Accumulate Mem: {}; Estimated Used Mem: {}, Record Size: {}, NumRecInBuff: {}; Total Processed Records: {}", new Object[]{estimatedUsedMemory >= this.maximumMemory, RedisMemoryThreshold.getSizeInHumanReadableFormat(usedMemory), RedisMemoryThreshold.getSizeInHumanReadableFormat(this.maximumMemory), RedisMemoryThreshold.getSizeInHumanReadableFormat(accumulatedMemory), RedisMemoryThreshold.getSizeInHumanReadableFormat(estimatedUsedMemory), RedisMemoryThreshold.getSizeInHumanReadableFormat(extraMemory), bufferSize, totalProcessed + (long)bufferSize});
        totalProcessed += (long)bufferSize;
        return true;
    }

    private Tuple2<Long, Long> memoryTuple() {
        Long configuredMemory;
        String memory = this.client.info(INFO_MEMORY);
        HashMap infoMemory = new HashMap();
        try {
            IoUtil.readLines((InputStream)new ByteArrayInputStream(memory.getBytes(StandardCharsets.UTF_8)), line -> {
                String[] pair = line.split(":");
                if (pair.length == 2) {
                    infoMemory.put(pair[0], pair[1]);
                }
            });
        }
        catch (IOException e) {
            LOGGER.error("Cannot parse Redis 'info memory' result '{}'.", (Object)memory, (Object)e);
            return null;
        }
        Long usedMemory = this.parseLong(INFO_MEMORY_SECTION_USEDMEMORY, (String)infoMemory.get(INFO_MEMORY_SECTION_USEDMEMORY));
        if (usedMemory == null) {
            usedMemory = 0L;
        }
        if (((configuredMemory = this.parseLong(INFO_MEMORY_SECTION_MAXMEMORY, (String)infoMemory.get(INFO_MEMORY_SECTION_MAXMEMORY))) == null || this.memoryLimit > 0L && configuredMemory > this.memoryLimit) && (configuredMemory = Long.valueOf(this.memoryLimit)) > 0L) {
            LOGGER.debug("Setting maximum memory size {}", (Object)RedisMemoryThreshold.getSizeInHumanReadableFormat(configuredMemory));
        }
        return Tuple2.of((Object)usedMemory, (Object)configuredMemory);
    }

    private Long parseLong(String name, String value) {
        if (value == null) {
            return null;
        }
        try {
            return Long.valueOf(value);
        }
        catch (NumberFormatException e) {
            LOGGER.debug("Cannot parse Redis 'info memory' field '{}' with value '{}'.", (Object)name, (Object)value);
            return null;
        }
    }

    public static String getSizeInHumanReadableFormat(Long size) {
        double sizeInUnit;
        if (size == null) {
            return "Not configured";
        }
        String[] units = new String[]{"B", "KB", "MB", "GB"};
        int unitIndex = 0;
        if (size == 0L) {
            return "0 B";
        }
        for (sizeInUnit = (double)size.longValue(); sizeInUnit >= 1024.0 && unitIndex < units.length - 1; sizeInUnit /= 1024.0, ++unitIndex) {
        }
        return String.format("%.2f %s", sizeInUnit, units[unitIndex]);
    }
}

