/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.nats.jetstream;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.List;
import java.util.Optional;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named(value="nats-jetstream")
@Dependent
public class NatsJetStreamChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NatsJetStreamChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.nats-jetstream.";
    private static final String PROP_URL = "debezium.sink.nats-jetstream.url";
    private static final String PROP_CREATE_STREAM = "debezium.sink.nats-jetstream.create-stream";
    private static final String PROP_SUBJECTS = "debezium.sink.nats-jetstream.subjects";
    private static final String PROP_STORAGE = "debezium.sink.nats-jetstream.storage";
    private static final String PROP_AUTH_JWT = "debezium.sink.nats-jetstream.auth.jwt";
    private static final String PROP_AUTH_SEED = "debezium.sink.nats-jetstream.auth.seed";
    private static final String PROP_AUTH_USER = "debezium.sink.nats-jetstream.auth.user";
    private static final String PROP_AUTH_PASSWORD = "debezium.sink.nats-jetstream.auth.password";
    private static final String PROP_AUTH_TLS_KEYSTORE = "debezium.sink.nats-jetstream.auth.tls.keystore";
    private static final String PROP_AUTH_TLS_KEYSTORE_PASSWORD = "debezium.sink.nats-jetstream.auth.tls.keystore.password";
    private static final String PROP_AUTH_TLS_PASSWORD = "debezium.sink.nats-jetstream.auth.tls.password";
    private Connection nc;
    private JetStream js;
    @ConfigProperty(name="debezium.sink.nats-jetstream.create-stream", defaultValue="false")
    boolean createStream;
    @ConfigProperty(name="debezium.sink.nats-jetstream.auth.jwt")
    Optional<String> jwt;
    @ConfigProperty(name="debezium.sink.nats-jetstream.auth.seed")
    Optional<String> seed;
    @ConfigProperty(name="debezium.sink.nats-jetstream.auth.user")
    Optional<String> user;
    @ConfigProperty(name="debezium.sink.nats-jetstream.auth.password")
    Optional<String> password;
    @ConfigProperty(name="debezium.sink.nats-jetstream.auth.tls.keystore")
    Optional<String> tlsKeyStore;
    @ConfigProperty(name="debezium.sink.nats-jetstream.auth.tls.keystore.password")
    Optional<String> tlsKeyStorePassword;
    @ConfigProperty(name="debezium.sink.nats-jetstream.auth.tls.password")
    Optional<String> tlsPassword;
    @Inject
    @CustomConsumerBuilder
    Instance<JetStream> customStreamingConnection;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        String url = (String)config.getValue(PROP_URL, String.class);
        if (this.customStreamingConnection.isResolvable()) {
            this.js = (JetStream)this.customStreamingConnection.get();
            LOGGER.info("Obtained custom configured JetStream '{}'", (Object)this.js);
            return;
        }
        try {
            Options.Builder natsOptionsBuilder = new Options.Builder().servers(url.split(",")).noReconnect();
            if (this.jwt.isPresent() && this.seed.isPresent()) {
                natsOptionsBuilder.authHandler(Nats.staticCredentials((char[])this.jwt.get().toCharArray(), (char[])this.seed.get().toCharArray()));
            } else if (this.user.isPresent() && this.password.isPresent()) {
                natsOptionsBuilder.userInfo(this.user.get(), this.password.get());
            } else if (this.tlsKeyStore.isPresent() && this.tlsKeyStorePassword.isPresent() && this.tlsPassword.isPresent()) {
                SSLContext ctx = NatsJetStreamChangeConsumer.sslAuthContext(this.tlsKeyStore.get(), this.tlsKeyStorePassword.get(), this.tlsPassword.get());
                natsOptionsBuilder.sslContext(ctx);
            }
            this.nc = Nats.connect((Options)natsOptionsBuilder.build());
            if (this.createStream) {
                String subjects = config.getOptionalValue(PROP_SUBJECTS, String.class).orElse("*.*.*");
                String storage = config.getOptionalValue(PROP_STORAGE, String.class).orElse("memory");
                StorageType storageType = storage.equals("file") ? StorageType.File : StorageType.Memory;
                StreamConfiguration streamConfig = StreamConfiguration.builder().name("DebeziumStream").description("The debezium stream, contains messages which are coming from debezium").subjects(subjects.split(",")).storageType(storageType).build();
                LOGGER.info("Creating stream with config: {}", (Object)streamConfig);
                JetStreamManagement jsm = this.nc.jetStreamManagement();
                jsm.addStream(streamConfig);
            }
            this.js = this.nc.jetStream();
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    @PreDestroy
    void close() {
        try {
            if (this.nc != null) {
                this.nc.close();
                LOGGER.info("NATS connection closed.");
            }
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        for (ChangeEvent<Object, Object> rec : records) {
            if (rec.value() != null) {
                String subject = this.streamNameMapper.map(rec.destination());
                byte[] recordBytes = this.getBytes(rec.value());
                LOGGER.trace("Received event @ {} = '{}'", (Object)subject, (Object)this.getString(rec.value()));
                try {
                    this.js.publish(subject, recordBytes);
                }
                catch (Exception e) {
                    throw new DebeziumException((Throwable)e);
                }
            }
            committer.markProcessed(rec);
        }
        committer.markBatchFinished();
    }

    private static SSLContext sslAuthContext(String keystorePath, String keystorePassword, String password) throws Exception {
        KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
        try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(keystorePath));){
            keystore.load(in, keystorePassword.toCharArray());
        }
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(keystore, password.toCharArray());
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(keystore);
        SSLContext ctx = SSLContext.getInstance("TLSv1.2");
        ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
        return ctx;
    }
}

