/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.hivemqclient.smallrye.reactive;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import io.quarkiverse.hivemqclient.smallrye.reactive.HiveMQClients;
import io.quarkiverse.hivemqclient.smallrye.reactive.HiveMQMqttConnectorOutgoingConfiguration;
import io.reactivex.Flowable;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.mqtt.SendingMqttMessageMetadata;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import mutiny.zero.flow.adapters.AdaptersToFlow;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Publisher;

public class HiveMQMqttSink {
    private final String topic;
    private final int qos;
    private final Flow.Subscriber<? extends Message<?>> sink;
    private final AtomicBoolean connected = new AtomicBoolean();

    public HiveMQMqttSink(Vertx vertx, HiveMQMqttConnectorOutgoingConfiguration config) {
        this.topic = config.getTopic().orElseGet(config::getChannel);
        this.qos = config.getQos();
        AtomicReference reference = new AtomicReference();
        this.sink = MultiUtils.via(msg -> msg.onSubscription().call(() -> {
            Mqtt3RxClient client = (Mqtt3RxClient)reference.get();
            if (client != null) {
                if (client.getState().isConnected()) {
                    this.connected.set(true);
                } else {
                    vertx.setPeriodic(100L, id -> {
                        if (client.getState().isConnected()) {
                            vertx.cancelTimer(id.longValue());
                            this.connected.set(true);
                        }
                    });
                }
            }
            return HiveMQClients.getConnectedClient(config).onItem().invoke(c -> {
                reference.set(c);
                this.connected.set(true);
            });
        }).onItem().transformToUniAndConcatenate(m -> this.send(reference, (Message<?>)m)).onCompletion().invoke(() -> {
            Mqtt3RxClient c = reference.getAndSet(null);
            if (c != null) {
                c.toBlocking().disconnect();
                this.connected.set(false);
            }
        }).onFailure().invoke(e -> {
            this.connected.set(false);
            MqttLogging.log.errorWhileSendingMessageToBroker(e);
        }));
    }

    private Uni<? extends Message<?>> send(AtomicReference<Mqtt3RxClient> reference, Message<?> msg) {
        Mqtt3RxClient client = reference.get();
        String actualTopicToBeUsed = this.topic;
        MqttQos actualQoS = MqttQos.fromCode((int)this.qos);
        boolean isRetain = false;
        Optional sendingMqttMessageMetadata = msg.getMetadata().get(SendingMqttMessageMetadata.class);
        if (sendingMqttMessageMetadata.isPresent()) {
            SendingMqttMessageMetadata mm = (SendingMqttMessageMetadata)sendingMqttMessageMetadata.get();
            actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
            actualQoS = MqttQos.fromCode((int)(mm.getQosLevel() == null ? actualQoS.getCode() : mm.getQosLevel().value()));
            isRetain = mm.isRetain();
        }
        if (actualTopicToBeUsed == null) {
            MqttLogging.log.ignoringNoTopicSet();
            return Uni.createFrom().item(msg);
        }
        Flowable publish = client.publish(Flowable.just((Object)((Mqtt3PublishBuilder.Complete)((Mqtt3PublishBuilder.Complete)((Mqtt3PublishBuilder.Complete)((Mqtt3PublishBuilder.Complete)Mqtt3Publish.builder().topic(actualTopicToBeUsed)).qos(actualQoS)).payload(this.convert(msg.getPayload()))).retain(isRetain)).build()));
        return Uni.createFrom().publisher(AdaptersToFlow.publisher((Publisher)publish)).onItemOrFailure().transformToUni((s, f) -> {
            if (f != null) {
                return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
            }
            return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg));
        });
    }

    private ByteBuffer convert(Object payload) {
        Buffer buffer = this.toBuffer(payload);
        return ByteBuffer.wrap(buffer.getBytes());
    }

    private Buffer toBuffer(Object payload) {
        if (payload instanceof JsonObject) {
            return new Buffer(((JsonObject)payload).toBuffer());
        }
        if (payload instanceof JsonArray) {
            return new Buffer(((JsonArray)payload).toBuffer());
        }
        if (payload instanceof String || payload.getClass().isPrimitive()) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((String)payload.toString()));
        }
        if (payload instanceof byte[]) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[])((byte[])payload)));
        }
        if (payload instanceof Buffer) {
            return (Buffer)payload;
        }
        if (payload instanceof io.vertx.core.buffer.Buffer) {
            return new Buffer((io.vertx.core.buffer.Buffer)payload);
        }
        return new Buffer(Json.encodeToBuffer((Object)payload));
    }

    public Flow.Subscriber<? extends Message<?>> getSink() {
        return this.sink;
    }

    public boolean isReady() {
        return this.connected.get();
    }
}

