/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messsaging.nats.jetstream.setup;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messsaging.nats.jetstream.JetStreamBuildConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messsaging.nats.jetstream.setup.JetStreamSetupException;
import io.quarkiverse.reactive.messsaging.nats.jetstream.setup.Stream;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

public class JetStreamSetup {
    private static final Logger logger = Logger.getLogger(JetStreamSetup.class);

    public void setup(Connection connection, JetStreamBuildConfiguration jetStreamConfiguration) {
        try {
            JetStreamManagement jsm = connection.jetStreamManagement();
            this.getStreams().stream().filter(streamConfig -> !streamConfig.subjects().isEmpty()).forEach(streamConfig -> this.getStreamConfiguration(jsm, (Stream)streamConfig).ifPresentOrElse(streamConfiguration -> this.updateStreamConfiguration(jsm, (StreamConfiguration)streamConfiguration, (Stream)streamConfig, jetStreamConfiguration), () -> this.createStreamConfiguration(jsm, (Stream)streamConfig, jetStreamConfiguration)));
        }
        catch (Exception e) {
            throw new JetStreamSetupException(String.format("Unable to configure stream: %s", e.getMessage()), e);
        }
    }

    private Optional<StreamConfiguration> getStreamConfiguration(JetStreamManagement jsm, Stream stream) {
        return this.getStreamInfo(jsm, stream.name()).map(StreamInfo::getConfiguration);
    }

    private Optional<StreamInfo> getStreamInfo(JetStreamManagement jsm, String streamName) {
        try {
            return Optional.of(jsm.getStreamInfo(streamName, StreamInfoOptions.allSubjects()));
        }
        catch (IOException e) {
            throw new ConnectionException("Failed getting stream info: " + e.getMessage(), e);
        }
        catch (JetStreamApiException e) {
            return Optional.empty();
        }
    }

    private void createStreamConfiguration(JetStreamManagement jsm, Stream stream, JetStreamBuildConfiguration jetStreamConfiguration) {
        try {
            logger.infof("Creating stream: %s with subjects: %s", (Object)stream.name(), stream.subjects());
            StreamConfiguration.Builder streamConfigBuilder = StreamConfiguration.builder().name(stream.name()).storageType(StorageType.valueOf((String)jetStreamConfiguration.storageType())).retentionPolicy(RetentionPolicy.valueOf((String)jetStreamConfiguration.retentionPolicy())).replicas(jetStreamConfiguration.replicas().intValue()).subjects(stream.subjects());
            jsm.addStream(streamConfigBuilder.build());
        }
        catch (JetStreamApiException | IOException e) {
            throw new ConnectionException(String.format("Failed creating stream: %s with message: %s", stream, e.getMessage()), e);
        }
    }

    private void updateStreamConfiguration(JetStreamManagement jsm, StreamConfiguration streamConfiguration, Stream stream, JetStreamBuildConfiguration jetStreamConfiguration) {
        try {
            if (!new HashSet(streamConfiguration.getSubjects()).containsAll(stream.subjects())) {
                logger.infof("Updating stream %s with subjects %s", (Object)streamConfiguration.getName(), stream.subjects());
                jsm.updateStream(StreamConfiguration.builder((StreamConfiguration)streamConfiguration).subjects(stream.subjects()).replicas(jetStreamConfiguration.replicas().intValue()).build());
            }
        }
        catch (JetStreamApiException | IOException e) {
            throw new ConnectionException(String.format("Failed updating stream: %s with message: %s", stream, e.getMessage()), e);
        }
    }

    private Collection<Stream> getStreams() {
        HashMap configs = new HashMap();
        Config config = ConfigProvider.getConfig();
        this.getChannelPrefixes(config).forEach(channelPrefix -> {
            if (this.isNatsConnector(config, (String)channelPrefix) && this.autoConfigure(config, (String)channelPrefix)) {
                this.getStream(config, (String)channelPrefix).ifPresent(streamName -> {
                    Stream streamConfig = Optional.ofNullable((Stream)configs.get(streamName)).orElseGet(() -> new Stream((String)streamName, (Set<String>)new HashSet<String>()));
                    this.getSubject(config, (String)channelPrefix).ifPresent(subject -> streamConfig.subjects().add((String)subject));
                    configs.putIfAbsent(streamName, streamConfig);
                });
            }
        });
        return configs.values();
    }

    private Set<String> getChannelPrefixes(Config config) {
        HashSet<String> channelPrefixes = new HashSet<String>();
        config.getPropertyNames().forEach(propertyName -> {
            if (propertyName.startsWith("mp.messaging.incoming.")) {
                int index = propertyName.indexOf(".", "mp.messaging.incoming.".length());
                channelPrefixes.add(propertyName.substring(0, index));
            } else if (propertyName.startsWith("mp.messaging.outgoing.")) {
                int index = propertyName.indexOf(".", "mp.messaging.outgoing.".length());
                channelPrefixes.add(propertyName.substring(0, index));
            }
        });
        return channelPrefixes;
    }

    private boolean isNatsConnector(Config config, String channelPrefix) {
        return config.getOptionalValue(channelPrefix + ".connector", String.class).filter("quarkus-jetstream"::equals).isPresent();
    }

    private Optional<String> getStream(Config config, String channelPrefix) {
        return config.getOptionalValue(channelPrefix + ".stream", String.class);
    }

    private Optional<String> getSubject(Config config, String channelPrefix) {
        return config.getOptionalValue(channelPrefix + ".subject", String.class).map(subject -> {
            if (subject.endsWith(".>")) {
                return subject.substring(0, subject.length() - 2);
            }
            return subject;
        });
    }

    private boolean autoConfigure(Config config, String channelPrefix) {
        return config.getOptionalValue(channelPrefix + ".auto-configure", Boolean.class).orElse(true);
    }
}

