/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.hivemqclient.smallrye.reactive;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3SubscribeBuilder;
import com.hivemq.client.rx.FlowableWithSingle;
import io.quarkiverse.hivemqclient.smallrye.reactive.HiveMQClients;
import io.quarkiverse.hivemqclient.smallrye.reactive.HiveMQMqttConnectorIncomingConfiguration;
import io.quarkiverse.hivemqclient.smallrye.reactive.HiveMQReceivingMqttMessage;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.converters.MultiConverter;
import io.smallrye.mutiny.converters.multi.MultiRxConverters;
import io.smallrye.reactive.messaging.mqtt.MqttFailStop;
import io.smallrye.reactive.messaging.mqtt.MqttFailureHandler;
import io.smallrye.reactive.messaging.mqtt.MqttIgnoreFailure;
import io.smallrye.reactive.messaging.mqtt.MqttMessage;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttExceptions;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

public class HiveMQMqttSource {
    private final PublisherBuilder<MqttMessage<?>> source;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final Pattern pattern;

    public HiveMQMqttSource(HiveMQMqttConnectorIncomingConfiguration config) {
        String topic = config.getTopic().orElseGet(config::getChannel);
        int qos = config.getQos();
        boolean broadcast = config.getBroadcast();
        MqttFailureHandler.Strategy strategy = MqttFailureHandler.Strategy.from((String)config.getFailureStrategy());
        MqttFailureHandler onNack = this.createFailureHandler(strategy, config.getChannel());
        if (topic.contains("#") || topic.contains("+")) {
            String replace = topic.replace("+", "[^/]+").replace("#", ".+");
            this.pattern = Pattern.compile(replace);
        } else {
            this.pattern = null;
        }
        HiveMQClients.ClientHolder holder = HiveMQClients.getHolder(config);
        this.source = ReactiveStreams.fromPublisher((Publisher)((Multi)holder.connect().onItem().transformToMulti(client -> Multi.createFrom().converter((MultiConverter)MultiRxConverters.fromFlowable(), (Object)((FlowableWithSingle)((Mqtt3SubscribeBuilder.Publishes.Start.Complete)((Mqtt3SubscribeBuilder.Publishes.Start.Complete)client.subscribePublishesWith().topicFilter(topic)).qos(MqttQos.fromCode((int)qos))).applySubscribe()).doOnSingle(subAck -> this.subscribed.set(true))).filter(m -> this.matches(topic, (Mqtt3Publish)m)).onItem().transform(x -> new HiveMQReceivingMqttMessage((Mqtt3Publish)x, onNack))).stage(multi -> {
            if (broadcast) {
                return multi.broadcast().toAllSubscribers();
            }
            return multi;
        })).onCancellation().invoke(() -> this.subscribed.set(false)).onFailure().invoke(arg_0 -> ((MqttLogging)MqttLogging.log).unableToConnectToBroker(arg_0)));
    }

    private boolean matches(String topic, Mqtt3Publish m) {
        String topicName = m.getTopic().toString();
        if (this.pattern != null) {
            return this.pattern.matcher(topicName).matches();
        }
        return topicName.equals(topic);
    }

    private MqttFailureHandler createFailureHandler(MqttFailureHandler.Strategy strategy, String channel) {
        switch (strategy) {
            case IGNORE: {
                return new MqttIgnoreFailure(channel);
            }
            case FAIL: {
                return new MqttFailStop(channel);
            }
        }
        throw MqttExceptions.ex.illegalArgumentUnknownStrategy(strategy.toString());
    }

    public PublisherBuilder<MqttMessage<?>> getSource() {
        return this.source;
    }

    public boolean isSubscribed() {
        return this.subscribed.get();
    }
}

