/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messsaging.nats.jetstream.processors.subscriber;

import io.nats.client.JetStream;
import io.nats.client.PublishOptions;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.quarkiverse.reactive.messsaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.JetStreamClient;
import io.quarkiverse.reactive.messsaging.nats.jetstream.mapper.HeaderMapper;
import io.quarkiverse.reactive.messsaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.Status;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.subscriber.MessageSubscriberConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.quarkiverse.reactive.messsaging.nats.jetstream.tracing.JetStreamTrace;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

public class MessageSubscriberProcessor
implements MessageProcessor {
    private static final Logger logger = Logger.getLogger(MessageSubscriberProcessor.class);
    private final MessageSubscriberConfiguration configuration;
    private final JetStreamClient jetStreamClient;
    private final PayloadMapper payloadMapper;
    private final Instrumenter<JetStreamTrace, Void> instrumenter;
    private final String streamName;
    private final String subject;

    public MessageSubscriberProcessor(JetStreamClient jetStreamClient, MessageSubscriberConfiguration configuration, PayloadMapper payloadMapper, JetStreamInstrumenter jetStreamInstrumenter) {
        this.jetStreamClient = jetStreamClient;
        this.configuration = configuration;
        this.payloadMapper = payloadMapper;
        this.instrumenter = jetStreamInstrumenter.publisher();
        this.streamName = this.getStreamName(configuration);
        this.subject = this.getSubject(configuration);
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber() {
        return MultiUtils.via(m -> m.onSubscription().call(this::getOrEstablishConnection).onItem().transformToUniAndConcatenate(this::send).onCompletion().invoke(this::close).onFailure().invoke(throwable -> {
            logger.errorf(throwable, "Failed to publish: %s", (Object)throwable.getMessage());
            this.close();
        }));
    }

    public Message<?> publish(Connection connection, Message<?> message) {
        try {
            Optional metadata = message.getMetadata(JetStreamOutgoingMessageMetadata.class);
            String messageId = metadata.map(JetStreamOutgoingMessageMetadata::messageId).orElseGet(() -> UUID.randomUUID().toString());
            byte[] payload = this.payloadMapper.toByteArray(message.getPayload());
            HashMap<String, List<String>> headers = new HashMap<String, List<String>>();
            metadata.ifPresent(m -> headers.putAll(m.headers()));
            headers.putIfAbsent("message.type", List.of(message.getPayload().getClass().getTypeName()));
            if (this.configuration.traceEnabled()) {
                TracingUtils.traceOutgoing(this.instrumenter, message, (Object)new JetStreamTrace(this.streamName, this.subject, messageId, headers, new String(payload)));
            }
            JetStream jetStream = connection.jetStream();
            PublishOptions options = this.createPublishOptions(messageId, this.streamName);
            jetStream.publish(this.subject, HeaderMapper.toJetStreamHeaders(headers), payload, options);
            return message;
        }
        catch (Exception e) {
            logger.errorf((Throwable)e, "Failed to publish message: %s", (Object)e.getMessage());
            throw new RuntimeException(e);
        }
    }

    @Override
    public Status getStatus() {
        return this.jetStreamClient.getConnection().map(c -> new Status(c.isConnected(), "Is connected")).orElseGet(() -> new Status(true, "Not connected"));
    }

    @Override
    public void close() {
        this.jetStreamClient.close();
    }

    @Override
    public String getChannel() {
        return this.configuration.getChannel();
    }

    private Uni<? extends Message<?>> send(Message<?> message) {
        return this.getOrEstablishConnection().onItem().transformToUni(connection -> this.send(message, (Connection)connection));
    }

    private Uni<? extends Message<?>> send(Message<?> message, Connection connection) {
        return Uni.createFrom().emitter(em -> {
            try {
                em.complete(this.publish(connection, message));
            }
            catch (Throwable e) {
                logger.errorf(e, "Failed sending message: %s", (Object)e.getMessage());
                em.fail(e);
            }
        }).emitOn(runnable -> connection.context().runOnContext(runnable));
    }

    private Uni<Connection> getOrEstablishConnection() {
        return this.jetStreamClient.getOrEstablishConnection();
    }

    private String getStreamName(MessageSubscriberConfiguration configuration) {
        return configuration.getStream().orElseThrow(() -> new RuntimeException("Stream not configured for channel = " + configuration.getChannel()));
    }

    private String getSubject(MessageSubscriberConfiguration configuration) {
        return configuration.getSubject().orElseThrow(() -> new RuntimeException("Subject not configured for channel = " + configuration.getChannel()));
    }

    private PublishOptions createPublishOptions(String messageId, String streamName) {
        return PublishOptions.builder().messageId(messageId).stream(streamName).build();
    }
}

