package fun.pplm.framework.poplar.mqtt.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import fun.pplm.framework.poplar.mqtt.config.MqttConfig;
import java.io.IOException;
import java.util.Iterator;
import javax.annotation.PreDestroy;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:fun/pplm/framework/poplar/mqtt/service/MqttService.class */
public class MqttService implements HealthIndicator {
    private static Logger logger = LoggerFactory.getLogger(MqttService.class);

    @Autowired
    private MqttConfig config;

    @Autowired
    private MqttCallback mqttCallback;

    @Autowired
    private ObjectMapper objectMapper;
    private MqttClient client;

    @EventListener({ApplicationReadyEvent.class})
    protected void init() {
        logger.debug("mqtt服务初始化开始...");
        try {
            this.client = new MqttClient(this.config.getUri(), this.config.getClientId(), new MemoryPersistence());
            this.client.setCallback(this.mqttCallback);
            this.client.connect(this.config.getOptions());
            Iterator<String> it = this.config.getTopics().iterator();
            while (it.hasNext()) {
                this.client.subscribe(it.next(), this.config.getSubQos().intValue());
            }
            logger.debug("mqtt服务初始化完成");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void publish(String str, Object obj) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(this.config.getPubQos().intValue());
        mqttMessage.setRetained(this.config.getPubRetained().booleanValue());
        try {
            String writeValueAsString = obj instanceof String ? (String) obj : this.objectMapper.writeValueAsString(obj);
            logger.info("发布消息: {}", writeValueAsString);
            mqttMessage.setPayload(writeValueAsString.getBytes(this.config.getCharset()));
            publish(str, mqttMessage);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void publish(String str, MqttMessage mqttMessage) throws MqttException {
        this.client.publish(str, mqttMessage);
    }

    @PreDestroy
    public void destory() {
        if (this.client != null) {
            logger.debug("mqtt服务停止开始...");
            try {
                this.client.disconnect();
                this.client.close();
            } catch (MqttException e) {
                logger.error(e.getMessage(), e);
            }
            logger.debug("mqtt服务停止完成");
        }
    }

    public Health health() {
        return this.client != null ? this.client.isConnected() ? Health.up().build() : Health.outOfService().build() : Health.down().build();
    }
}
