/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messsaging.nats.jetstream;

import io.quarkiverse.reactive.messsaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.JetStreamConnectorIncomingConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.ConnectionConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.JetStreamClient;
import io.quarkiverse.reactive.messsaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.publisher.MessagePublisherConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.publisher.MessagePublisherProcessor;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.subscriber.MessageSubscriberConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.subscriber.MessageSubscriberProcessor;
import io.quarkiverse.reactive.messsaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Vertx;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.inject.Inject;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(value="quarkus-jetstream")
@ConnectorAttributes(value={@ConnectorAttribute(name="stream", description="The stream to subscribe or publish messages to", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="subject", description="The subject to subscribe or publish messages to", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="String"), @ConnectorAttribute(name="trace-enabled", description="Enable traces for publisher or subscriber", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="Boolean", defaultValue="true"), @ConnectorAttribute(name="auto-configure", description="Auto configure subject on NATS", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type="Boolean", defaultValue="true"), @ConnectorAttribute(name="ordered", description="Flag indicating whether this subscription should be ordered", direction=ConnectorAttribute.Direction.INCOMING, type="Boolean"), @ConnectorAttribute(name="deliver-group", description="The optional deliver group to join", direction=ConnectorAttribute.Direction.INCOMING, type="String"), @ConnectorAttribute(name="durable", description="Sets the durable name for the consumer", direction=ConnectorAttribute.Direction.INCOMING, type="String"), @ConnectorAttribute(name="max-deliver", description="The maximum number of times a specific message delivery will be attempted", direction=ConnectorAttribute.Direction.INCOMING, type="Long", defaultValue="1"), @ConnectorAttribute(name="back-off", description="The timing of re-deliveries as a comma-separated list of durations", direction=ConnectorAttribute.Direction.INCOMING, type="String"), @ConnectorAttribute(name="payload-type", description="The payload type", direction=ConnectorAttribute.Direction.INCOMING, type="String")})
public class JetStreamConnector
implements InboundConnector,
OutboundConnector,
HealthReporter {
    public static final String CONNECTOR_NAME = "quarkus-jetstream";
    private final List<MessageProcessor> processors;
    private final ExecutionHolder executionHolder;
    private final PayloadMapper payloadMapper;
    private final JetStreamInstrumenter jetStreamInstrumenter;
    private final NatsConfiguration natsConfiguration;

    @Inject
    public JetStreamConnector(PayloadMapper payloadMapper, JetStreamInstrumenter jetStreamInstrumenter, ExecutionHolder executionHolder, NatsConfiguration natsConfiguration) {
        this.payloadMapper = payloadMapper;
        this.jetStreamInstrumenter = jetStreamInstrumenter;
        this.processors = new CopyOnWriteArrayList<MessageProcessor>();
        this.executionHolder = executionHolder;
        this.natsConfiguration = natsConfiguration;
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        JetStreamConnectorIncomingConfiguration configuration = new JetStreamConnectorIncomingConfiguration(config);
        JetStreamClient client = new JetStreamClient(ConnectionConfiguration.of(this.natsConfiguration), this.getVertx());
        MessagePublisherProcessor processor = new MessagePublisherProcessor(client, MessagePublisherConfiguration.of(configuration), this.payloadMapper, this.jetStreamInstrumenter);
        this.processors.add(processor);
        return processor.getPublisher();
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        JetStreamConnectorIncomingConfiguration configuration = new JetStreamConnectorIncomingConfiguration(config);
        JetStreamClient client = new JetStreamClient(ConnectionConfiguration.of(this.natsConfiguration), this.getVertx());
        MessageSubscriberProcessor processor = new MessageSubscriberProcessor(client, MessageSubscriberConfiguration.of(configuration), this.payloadMapper, this.jetStreamInstrumenter);
        this.processors.add(processor);
        return processor.getSubscriber();
    }

    public HealthReport getReadiness() {
        return this.getHealth();
    }

    public HealthReport getLiveness() {
        return this.getHealth();
    }

    HealthReport getHealth() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        this.processors.forEach(client -> builder.add(new HealthReport.ChannelInfo(client.getChannel(), client.getStatus().healthy(), client.getStatus().message())));
        return builder.build();
    }

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=50) @BeforeDestroyed(value=ApplicationScoped.class) Object ignored) {
        this.processors.forEach(MessageProcessor::close);
    }

    public Vertx getVertx() {
        return this.executionHolder.vertx();
    }
}

