/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messsaging.nats.jetstream.processors.publisher;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.Message;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.quarkiverse.reactive.messsaging.nats.jetstream.JetStreamIncomingMessage;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messsaging.nats.jetstream.client.JetStreamClient;
import io.quarkiverse.reactive.messsaging.nats.jetstream.mapper.PayloadMapper;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.MessageProcessor;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.Status;
import io.quarkiverse.reactive.messsaging.nats.jetstream.processors.publisher.MessagePublisherConfiguration;
import io.quarkiverse.reactive.messsaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.quarkiverse.reactive.messsaging.nats.jetstream.tracing.JetStreamTrace;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.mutiny.core.Context;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.jboss.logging.Logger;

public class MessagePublisherProcessor
implements MessageProcessor {
    private static final Logger logger = Logger.getLogger(MessagePublisherProcessor.class);
    static final int CONSUMER_ALREADY_IN_USE = 10013;
    private final MessagePublisherConfiguration configuration;
    private final JetStreamClient jetStreamClient;
    private final PayloadMapper payloadMapper;
    private final Instrumenter<JetStreamTrace, Void> instrumenter;
    private final AtomicReference<Status> status;

    public MessagePublisherProcessor(JetStreamClient jetStreamClient, MessagePublisherConfiguration configuration, PayloadMapper payloadMapper, JetStreamInstrumenter jetStreamInstrumenter) {
        this.configuration = configuration;
        this.jetStreamClient = jetStreamClient;
        this.payloadMapper = payloadMapper;
        this.instrumenter = jetStreamInstrumenter.receiver();
        this.status = new AtomicReference<Status>(new Status(false, "Not connected"));
    }

    @Override
    public Status getStatus() {
        return this.status.get();
    }

    @Override
    public void close() {
        this.jetStreamClient.close();
    }

    @Override
    public String getChannel() {
        return this.configuration.getChannel();
    }

    public Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> getPublisher() {
        return this.jetStreamClient.getOrEstablishConnection().onItem().transformToMulti(this::publish).onFailure().invoke(throwable -> {
            if (!this.isConsumerAlreadyInUse((Throwable)throwable)) {
                logger.errorf(throwable, "Publish failure: %s", (Object)throwable.getMessage());
            }
            this.close();
        }).onFailure().retry().withBackOff(Duration.ofSeconds(30L)).indefinitely().onCompletion().invoke(this::close);
    }

    public Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> publish(Connection connection) {
        return Multi.createFrom().deferred(() -> Multi.createFrom().emitter(emitter -> {
            try {
                JetStream jetStream = connection.jetStream();
                String subject = this.configuration.getSubject();
                Dispatcher dispatcher = connection.createDispatcher();
                PushSubscribeOptions pushOptions = this.createPushSubscribeOptions(this.configuration);
                jetStream.subscribe(subject, dispatcher, message -> emitter.emit(this.create(this.configuration, message, connection.context())), false, pushOptions);
                this.setStatus(true, "Is connected");
            }
            catch (JetStreamApiException e) {
                if (10013 == e.getApiErrorCode()) {
                    this.setStatus(true, "Consumer already in use");
                    emitter.fail((Throwable)e);
                } else {
                    logger.errorf((Throwable)e, "Failed subscribing to stream with message: %s", (Object)e.getMessage());
                    this.setStatus(false, e.getMessage());
                    emitter.fail((Throwable)e);
                }
            }
            catch (Throwable e) {
                logger.errorf(e, "Failed subscribing to stream with message: %s", (Object)e.getMessage());
                this.setStatus(false, e.getMessage());
                emitter.fail(e);
            }
        })).emitOn(runnable -> connection.context().runOnContext(runnable));
    }

    private void setStatus(boolean healthy, String message) {
        this.status.set(new Status(healthy, message));
    }

    private org.eclipse.microprofile.reactive.messaging.Message<?> create(MessagePublisherConfiguration configuration, Message message, Context context) {
        JetStreamIncomingMessage incomingMessage = configuration.getType().map(type -> new JetStreamIncomingMessage(message, this.payloadMapper.toPayload(message, (String)type), context)).orElseGet(() -> new JetStreamIncomingMessage<Object>(message, this.payloadMapper.toPayload(message).orElse(null), context));
        if (configuration.traceEnabled()) {
            return TracingUtils.traceIncoming(this.instrumenter, (org.eclipse.microprofile.reactive.messaging.Message)incomingMessage, (Object)JetStreamTrace.trace(incomingMessage));
        }
        return incomingMessage;
    }

    private boolean isConsumerAlreadyInUse(Throwable throwable) {
        if (throwable instanceof JetStreamApiException) {
            JetStreamApiException jetStreamApiException = (JetStreamApiException)throwable;
            return jetStreamApiException.getApiErrorCode() == 10013;
        }
        return false;
    }

    private PushSubscribeOptions createPushSubscribeOptions(MessagePublisherConfiguration configuration) {
        String deliverGroup = configuration.getDeliverGroup().orElse(null);
        String durable = configuration.getDurable().orElse(null);
        String[] backoff = this.getBackOff(configuration).orElse(null);
        Long maxDeliver = configuration.getMaxDeliver();
        return MessagePublisherProcessor.createPushSubscribeOptions(durable, deliverGroup, backoff, maxDeliver);
    }

    static PushSubscribeOptions createPushSubscribeOptions(String durable, String deliverGroup, String[] backoff, Long maxDeliever) {
        return ((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().deliverGroup(deliverGroup).durable(durable)).configuration(ConsumerConfiguration.builder().maxDeliver(maxDeliever).backoff((Duration[])MessagePublisherProcessor.getBackOff(backoff).orElse(null)).build())).build();
    }

    private Optional<String[]> getBackOff(MessagePublisherConfiguration configuration) {
        return configuration.getBackOff().map(backoff -> backoff.split(","));
    }

    private static Optional<Duration[]> getBackOff(String[] backoff) {
        if (backoff == null || backoff.length == 0) {
            return Optional.empty();
        }
        return Optional.of(Arrays.stream(backoff).map(MessagePublisherProcessor::toDuration).collect(Collectors.toList()).toArray(new Duration[0]));
    }

    private static Duration toDuration(String value) {
        return Duration.parse(value);
    }
}

