/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.redis;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;

@Named(value="redis")
@Dependent
public class RedisStreamChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.redis.";
    private static final String PROP_ADDRESS = "debezium.sink.redis.address";
    private static final String PROP_USER = "debezium.sink.redis.user";
    private static final String PROP_PASSWORD = "debezium.sink.redis.password";
    private HostAndPort address;
    private Optional<String> user;
    private Optional<String> password;
    @ConfigProperty(name="debezium.sink.redis.null.key", defaultValue="default")
    String nullKey;
    @ConfigProperty(name="debezium.sink.redis.null.value", defaultValue="default")
    String nullValue;
    private Jedis client = null;
    @Inject
    @CustomConsumerBuilder
    Instance<Jedis> customClient;

    @PostConstruct
    void connect() {
        if (this.customClient.isResolvable()) {
            this.client = (Jedis)this.customClient.get();
            try {
                this.client.ping();
                LOGGER.info("Obtained custom configured Jedis '{}'", (Object)this.client);
                return;
            }
            catch (Exception e) {
                LOGGER.warn("Invalid custom configured Jedis '{}'", (Object)this.client);
            }
        }
        Config config = ConfigProvider.getConfig();
        this.address = HostAndPort.from((String)((String)config.getValue(PROP_ADDRESS, String.class)));
        this.user = config.getOptionalValue(PROP_USER, String.class);
        this.password = config.getOptionalValue(PROP_PASSWORD, String.class);
        this.client = new Jedis(this.address);
        if (this.user.isPresent()) {
            this.client.auth(this.user.get(), this.password.get());
        } else if (this.password.isPresent()) {
            this.client.auth(this.password.get());
        } else {
            this.client.ping();
        }
        LOGGER.info("Using default Jedis '{}'", (Object)this.client);
    }

    @PreDestroy
    void close() {
        try {
            this.client.close();
        }
        catch (Exception e) {
            LOGGER.warn("Exception while closing Jedis: {}", (Object)this.client, (Object)e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        for (ChangeEvent<Object, Object> record : records) {
            try {
                LOGGER.trace("Received event '{}'", record);
                String destination = this.streamNameMapper.map(record.destination());
                String key = record.key() != null ? this.getString(record.key()) : this.nullKey;
                String value = record.value() != null ? this.getString(record.value()) : this.nullValue;
                this.client.xadd(destination, null, Collections.singletonMap(key, value));
                committer.markProcessed(record);
            }
            catch (Exception e) {
                throw new DebeziumException((Throwable)e);
            }
        }
        committer.markBatchFinished();
    }
}

