SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)
場景
Windows上Mqtt服務(wù)器搭建與使用客戶端工具MqttBox進行測試:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112305328
在上面搭建好了MQTT服務(wù)器以及客戶端工具MqttBox之后,怎樣在SpringBoot中實現(xiàn)訂閱主題接收消息和發(fā)布主題推送消息的功能。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關(guān)注公眾號
霸道的程序猿
獲取編程相關(guān)電子書、教程推送與免費下載。
實現(xiàn)
首先搭建起一個SpringBoot項目,引入最基本的Web依賴,這里可以使用快速搭建框架進行搭建
若依前后端分離版手把手教你本地搭建環(huán)境并運行項目:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108465662
然后搭建好SpringBoot項目后,首先在項目中引入mqtt的相關(guān)依賴
??????? <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>然后連接MQTT服務(wù)器時需要配置一些參數(shù),比如服務(wù)器地址、用戶名、密碼等。所以需要將這些配置放在配置文件中
?
然后在Spring節(jié)點下添加如下配置
# Spring配置 spring:# MQTTmqtt:# 服務(wù)器連接地址,如果有多個,用逗號隔開host: tcp://你自己的MQTT服務(wù)器的地址:1883# 連接服務(wù)器默認客戶端IDclientId: mqtt_client_id_001# 默認的消息推送主題,實際可在調(diào)用接口時指定topic: mqtt_topic_001,mqtt_topic_002,mqtt_topic_003# 用戶名username: admin# 密碼password: admin# 連接超時timeout: 30# 心跳keepalive: 30添加位置
?
這里的服務(wù)器連接地址就是上面博客搭建的MQTT服務(wù)器的地址,其端口號是mqtt協(xié)議連接的端口號,默認是1883,不是mqtt服務(wù)端后臺登錄的端口號。
默認的客戶端id用來作為在MQTT服務(wù)端的唯一標識,然后下面的默認消息推送的主題會在項目啟動后先發(fā)布這些主題,實際使用時需要在接口調(diào)用時指定。
然后用戶名密碼就是上面MQTT中的用戶名和密碼
在配置文件中添加了配置項之后需要在代碼中獲取到這些配置項,根據(jù)自己的項目的規(guī)范去決定是通過注解還是其他方式來獲取配置項
這里使用如下方式
創(chuàng)建一個PropertiesUtil類用來加載配置項的的內(nèi)容
import java.io.IOException; import java.io.InputStream; import java.util.Properties;/*** 獲取配置信息**/ public class PropertiesUtil {public static String MQTT_HOST;public static String MQTT_CLIENT_ID;public static String MQTT_USER_NAME;public static String MQTT_PASSWORD;public static String MQTT_TOPIC;public static Integer MQTT_TIMEOUT;public static Integer MQTT_KEEP_ALIVE;/***? mqtt配置*/static {Properties properties = loadMqttProperties();MQTT_HOST = properties.getProperty("host");MQTT_CLIENT_ID = properties.getProperty("clientId");MQTT_USER_NAME = properties.getProperty("username");MQTT_PASSWORD = properties.getProperty("password");MQTT_TOPIC = properties.getProperty("topic");MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));}private static Properties loadMqttProperties() {InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");Properties properties = new Properties();try {properties.load(inputstream);return properties;} catch (IOException e) {throw new RuntimeException(e);} finally {try {if (inputstream != null) {inputstream.close();}} catch (IOException e) {throw new RuntimeException(e);}}} }然后就是需要在SpringBoot項目啟動后連接到mqtt服務(wù)器并創(chuàng)建客戶端,然后在具體的業(yè)務(wù)需求中比如在Controller中進行訂閱和發(fā)布主題。
所以首先創(chuàng)建一個MqttConsumer類,并使其實現(xiàn)ApplicationRunne接口的run方法以實現(xiàn)在項目啟動后執(zhí)行所需要的操作
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;@Component public class MqttConsumer implements ApplicationRunner {private static MqttClient client;@Overridepublic void run(ApplicationArguments args) {System.out.println("初始化并啟動mqtt......");this.connect();}/*** 連接mqtt服務(wù)器*/private void connect() {try {// 1 創(chuàng)建客戶端getClient();// 2 設(shè)置配置MqttConnectOptions options = getOptions();String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");// 3 消息發(fā)布質(zhì)量int[] qos = getQos(topic.length);// 4 最后設(shè)置create(options, topic, qos);} catch (Exception e) {System.out.println("mqtt連接異常:" + e);}}/***? 創(chuàng)建客戶端? --- 1 ---*/public void getClient() {try {if (null == client) {client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());}System.out.println("創(chuàng)建mqtt客戶端:" );} catch (Exception e) {System.out.println("創(chuàng)建mqtt客戶端異常:\" + e:" );}}/***? 生成配置對象,用戶名,密碼等? --- 2 ---*/public MqttConnectOptions getOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(PropertiesUtil.MQTT_USER_NAME);options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());// 設(shè)置超時時間options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);// 設(shè)置會話心跳時間options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);// 是否清除sessionoptions.setCleanSession(false);System.out.println("--生成mqtt配置對象");return options;}/***? qos?? --- 3 ---*/public int[] getQos(int length) {int[] qos = new int[length];for (int i = 0; i < length; i++) {/***? MQTT協(xié)議中有三種消息發(fā)布服務(wù)質(zhì)量:** QOS0: “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。* QOS1: “至少一次”,確保消息到達,但消息重復(fù)可能會發(fā)生。* QOS2: “只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果,資源開銷大*/qos[i] = 1;}System.out.println("--設(shè)置消息發(fā)布質(zhì)量");return qos;}/***? 裝載各種實例和訂閱主題? --- 4 ---*/public void create(MqttConnectOptions options, String[] topic, int[] qos) {try {client.setCallback(new MqttConsumerCallback(client, options, topic, qos));System.out.println("--添加回調(diào)處理類");client.connect(options);} catch (Exception e) {System.out.println("裝載實例或訂閱主題異常:" + e);}}/*** 訂閱某個主題** @param topic* @param qos*/public static void subscribe(String topic, int qos) {try {System.out.println("topic:" + topic);client.subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 發(fā)布,非持久化**? qos根據(jù)文檔設(shè)置為1** @param topic* @param msg*/public static void publish(String topic, String msg) {publish(1, false, topic, msg);}/*** 發(fā)布*/public static void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = client.getTopic(topic);if (null == mTopic) {System.out.println("topic:" + topic + " 不存在");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();if (!token.isComplete()) {System.out.println("消息發(fā)送成功");}} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}} }在上面的啟動類中的run方法中會首先連接mqtt服務(wù)器并創(chuàng)建客戶端,然后加載配置文件中配置的默認主題并調(diào)用create
進行訂閱。
這其中也提供了單個的用于訂閱主題和發(fā)布消息的方法。
其中在訂閱主題后接收消息時需要一個回調(diào)方法。
?
所以需要新建一個實現(xiàn)了MqttCallbackExtended接口的相關(guān)方法的回調(diào)處理類MqttConsumerCallback
import org.eclipse.paho.client.mqttv3.*;import java.util.Arrays;/*** mqtt回調(diào)處理類*/public class MqttConsumerCallback implements MqttCallbackExtended {private MqttClient client;private MqttConnectOptions options;private String[] topic;private int[] qos;public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {this.client = client;this.options = options;this.topic = topic;this.qos = qos;}/*** 斷開重連*/@Overridepublic void connectionLost(Throwable cause) {System.out.println("MQTT連接斷開,發(fā)起重連......");try {if (null != client && !client.isConnected()) {client.reconnect();System.out.println("嘗試重新連接");} else {client.connect(options);System.out.println("嘗試建立新連接");}} catch (Exception e) {e.printStackTrace();}}/*** 接收到消息調(diào)用令牌中調(diào)用*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + Arrays.toString(topic));}/*** 消息處理*/@Overridepublic void messageArrived(String topic, MqttMessage message) {try {String msg = new String(message.getPayload());System.out.println("收到topic:" + topic + " 消息:" + msg);System.out.println("收到消息后執(zhí)行具體的業(yè)務(wù)邏輯操作,比如將消息存儲進數(shù)據(jù)庫");} catch (Exception e) {System.out.println("處理mqtt消息異常:" + e);}}/*** mqtt連接后訂閱主題*/@Overridepublic void connectComplete(boolean b, String s) {try {if (null != topic && null != qos) {if (client.isConnected()) {client.subscribe(topic, qos);System.out.println("mqtt連接成功,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);System.out.println("--訂閱主題::" + Arrays.toString(topic));} else {System.out.println("mqtt連接失敗,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);}}} catch (Exception e) {System.out.println("mqtt訂閱主題異常:" + e);}} }以上三個類建立成功之后就可以進行訂閱主題和發(fā)布消息的測試了。
發(fā)布指定主題的消息
新建一個測試用的Controller接口用來測試推送消息
import com.ruoyi.web.mqtt.MqttConsumer; import org.springframework.data.repository.query.Param; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping("/testmqtt") public class MqttTest {/*** 測試推送消息*/@ResponseBody@GetMapping(value = "/push")public Object push(@Param("topic") String topic,@Param("msg") String msg) {MqttConsumer.publish(topic, msg);return "測試發(fā)布主題成功";} }并且把這個接口url放開權(quán)限驗證可以直接在postman中進行接口調(diào)用
這里直接在接口方法中調(diào)用了上面的推送消息的方法,第一個參數(shù)是指定的主題,第二個參數(shù)是消息的內(nèi)容
那么此時SpringBoot就充當了發(fā)布者的角色。
在測試推送消息前需要使用MqttBox連接到同一個MQTT服務(wù)器并訂閱同一個主題,這里訂閱badao主題
我們調(diào)用一下此接口
?
然后此時查看下MqttBox中已經(jīng)收到消息
?
SpringBoot訂閱主題并接收消息
在上面的接口中再添加一個接口用來訂閱某個指定主題
??? /*** 測試接收消息*/@ResponseBody@GetMapping(value = "/subscribe")public Object subscribe(@Param("topic") String topic,@Param("qus") int qus) {MqttConsumer.subscribe(topic, qus);return "訂閱主題"+topic+"成功";}這里訂閱主題的方法第一個參數(shù)是主題,第二個是消息質(zhì)量
然后再調(diào)用下此接口
?
可以看到訂閱主題成功,然后我們使用MqttBox去發(fā)布一個同樣主題的消息,那么SpringBoot這邊的回調(diào)方法就可以接收到發(fā)送的消息并進行后續(xù)的業(yè)務(wù)操作,
比如將消息存儲到數(shù)據(jù)庫等。
?
總結(jié)
以上是生活随笔為你收集整理的SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 信息系统项目管理师-知识、变更、战略管理
- 下一篇: SpringBoot连接MQTT服务器时