/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.redis.history;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.storage.redis.history.RedisSchemaHistoryConfig;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Loggings;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class RedisSchemaHistory
extends AbstractSchemaHistory {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class);
    private Duration initialRetryDelay;
    private Duration maxRetryDelay;
    private Integer maxRetryCount;
    private final DocumentWriter writer = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private final AtomicBoolean running = new AtomicBoolean();
    private RedisClient client;
    private RedisSchemaHistoryConfig config;

    void connect() {
        RedisConnection redisConnection = new RedisConnection(this.config.getAddress(), this.config.getDbIndex(), this.config.getUser(), this.config.getPassword(), this.config.getConnectionTimeout(), this.config.getSocketTimeout(), this.config.isSslEnabled());
        this.client = redisConnection.getRedisClient("debezium:schema_history", this.config.isWaitEnabled(), this.config.getWaitTimeout(), this.config.isWaitRetryEnabled(), this.config.getWaitRetryDelay());
    }

    public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
        this.config = new RedisSchemaHistoryConfig(config);
        this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay().intValue());
        this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay().intValue());
        this.maxRetryCount = this.config.getMaxRetryCount();
        super.configure(config, comparator, listener, useCatalogBeforeSchema);
    }

    public synchronized void start() {
        super.start();
        LOGGER.info("Starting RedisSchemaHistory");
        this.doWithRetry(() -> true, "Connection to Redis");
    }

    protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
        String line;
        if (record == null) {
            return;
        }
        try {
            line = this.writer.write(record.document());
        }
        catch (IOException e) {
            Loggings.logErrorAndTraceRecord((Logger)LOGGER, (Object)record, (String)"Failed to convert record to string", (Throwable)e);
            throw new SchemaHistoryException("Unable to write database schema history record");
        }
        this.doWithRetry(() -> {
            this.client.xadd(this.config.getRedisKeyName(), Collections.singletonMap("schema", line));
            LOGGER.trace("Record written to database schema history in Redis: " + line);
            return true;
        }, "Writing to database schema history stream");
    }

    public void stop() {
        this.running.set(false);
        if (this.client != null) {
            this.client.disconnect();
        }
        super.stop();
    }

    protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
        List entries = this.doWithRetry(() -> this.client.xrange(this.config.getRedisKeyName()), "Writing to database schema history stream");
        for (Map item : entries) {
            try {
                records.accept(new HistoryRecord(this.reader.read((String)item.get("schema"))));
            }
            catch (IOException e) {
                LOGGER.error("Failed to convert record to string: {}", (Object)item, (Object)e);
                return;
            }
        }
    }

    public boolean storageExists() {
        return true;
    }

    public boolean exists() {
        return this.doWithRetry(() -> this.client != null && this.client.xlen(this.config.getRedisKeyName()) > 0L, "Check if previous record exists");
    }

    private <T> T doWithRetry(Supplier<T> action, String description) {
        DelayStrategy delayStrategy = DelayStrategy.exponential((Duration)this.initialRetryDelay, (Duration)this.maxRetryDelay);
        for (int i = 1; i <= this.maxRetryCount; ++i) {
            try {
                if (this.client == null) {
                    this.connect();
                }
                return action.get();
            }
            catch (RedisClientConnectionException e) {
                LOGGER.warn("Connection to Redis failed, will try to reconnect [attempt {} of {}]", (Object)i, (Object)this.maxRetryCount);
                try {
                    if (this.client != null) {
                        this.client.disconnect();
                    }
                }
                catch (Exception eDisconnect) {
                    LOGGER.info("Exception while disconnecting", (Throwable)eDisconnect);
                }
                this.client = null;
            }
            catch (Exception e) {
                LOGGER.warn(description + " failed, will retry", (Throwable)e);
            }
            delayStrategy.sleepWhen(true);
        }
        throw new SchemaHistoryException(String.format("Failed to connect to Redis after %d attempts.", this.maxRetryCount));
    }
}

