日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)

發布時間:2025/3/19 javascript 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

場景

Windows上Mqtt服務器搭建與使用客戶端工具MqttBox進行測試:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112305328

在上面搭建好了MQTT服務器以及客戶端工具MqttBox之后,怎樣在SpringBoot中實現訂閱主題接收消息和發布主題推送消息的功能。

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。

實現

首先搭建起一個SpringBoot項目,引入最基本的Web依賴,這里可以使用快速搭建框架進行搭建

若依前后端分離版手把手教你本地搭建環境并運行項目:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108465662

然后搭建好SpringBoot項目后,首先在項目中引入mqtt的相關依賴

??????? <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

然后連接MQTT服務器時需要配置一些參數,比如服務器地址、用戶名、密碼等。所以需要將這些配置放在配置文件中

?

然后在Spring節點下添加如下配置

# Spring配置 spring:# MQTTmqtt:# 服務器連接地址,如果有多個,用逗號隔開host: tcp://你自己的MQTT服務器的地址:1883# 連接服務器默認客戶端IDclientId: mqtt_client_id_001# 默認的消息推送主題,實際可在調用接口時指定topic: mqtt_topic_001,mqtt_topic_002,mqtt_topic_003# 用戶名username: admin# 密碼password: admin# 連接超時timeout: 30# 心跳keepalive: 30

添加位置

?

這里的服務器連接地址就是上面博客搭建的MQTT服務器的地址,其端口號是mqtt協議連接的端口號,默認是1883,不是mqtt服務端后臺登錄的端口號。

默認的客戶端id用來作為在MQTT服務端的唯一標識,然后下面的默認消息推送的主題會在項目啟動后先發布這些主題,實際使用時需要在接口調用時指定。

然后用戶名密碼就是上面MQTT中的用戶名和密碼

在配置文件中添加了配置項之后需要在代碼中獲取到這些配置項,根據自己的項目的規范去決定是通過注解還是其他方式來獲取配置項

這里使用如下方式

創建一個PropertiesUtil類用來加載配置項的的內容

import java.io.IOException; import java.io.InputStream; import java.util.Properties;/*** 獲取配置信息**/ public class PropertiesUtil {public static String MQTT_HOST;public static String MQTT_CLIENT_ID;public static String MQTT_USER_NAME;public static String MQTT_PASSWORD;public static String MQTT_TOPIC;public static Integer MQTT_TIMEOUT;public static Integer MQTT_KEEP_ALIVE;/***? mqtt配置*/static {Properties properties = loadMqttProperties();MQTT_HOST = properties.getProperty("host");MQTT_CLIENT_ID = properties.getProperty("clientId");MQTT_USER_NAME = properties.getProperty("username");MQTT_PASSWORD = properties.getProperty("password");MQTT_TOPIC = properties.getProperty("topic");MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));}private static Properties loadMqttProperties() {InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");Properties properties = new Properties();try {properties.load(inputstream);return properties;} catch (IOException e) {throw new RuntimeException(e);} finally {try {if (inputstream != null) {inputstream.close();}} catch (IOException e) {throw new RuntimeException(e);}}} }

然后就是需要在SpringBoot項目啟動后連接到mqtt服務器并創建客戶端,然后在具體的業務需求中比如在Controller中進行訂閱和發布主題。

所以首先創建一個MqttConsumer類,并使其實現ApplicationRunne接口的run方法以實現在項目啟動后執行所需要的操作

import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;@Component public class MqttConsumer implements ApplicationRunner {private static MqttClient client;@Overridepublic void run(ApplicationArguments args) {System.out.println("初始化并啟動mqtt......");this.connect();}/*** 連接mqtt服務器*/private void connect() {try {// 1 創建客戶端getClient();// 2 設置配置MqttConnectOptions options = getOptions();String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");// 3 消息發布質量int[] qos = getQos(topic.length);// 4 最后設置create(options, topic, qos);} catch (Exception e) {System.out.println("mqtt連接異常:" + e);}}/***? 創建客戶端? --- 1 ---*/public void getClient() {try {if (null == client) {client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());}System.out.println("創建mqtt客戶端:" );} catch (Exception e) {System.out.println("創建mqtt客戶端異常:\" + e:" );}}/***? 生成配置對象,用戶名,密碼等? --- 2 ---*/public MqttConnectOptions getOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(PropertiesUtil.MQTT_USER_NAME);options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());// 設置超時時間options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);// 設置會話心跳時間options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);// 是否清除sessionoptions.setCleanSession(false);System.out.println("--生成mqtt配置對象");return options;}/***? qos?? --- 3 ---*/public int[] getQos(int length) {int[] qos = new int[length];for (int i = 0; i < length; i++) {/***? MQTT協議中有三種消息發布服務質量:** QOS0: “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用于如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。* QOS1: “至少一次”,確保消息到達,但消息重復可能會發生。* QOS2: “只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統中,消息重復或丟失會導致不正確的結果,資源開銷大*/qos[i] = 1;}System.out.println("--設置消息發布質量");return qos;}/***? 裝載各種實例和訂閱主題? --- 4 ---*/public void create(MqttConnectOptions options, String[] topic, int[] qos) {try {client.setCallback(new MqttConsumerCallback(client, options, topic, qos));System.out.println("--添加回調處理類");client.connect(options);} catch (Exception e) {System.out.println("裝載實例或訂閱主題異常:" + e);}}/*** 訂閱某個主題** @param topic* @param qos*/public static void subscribe(String topic, int qos) {try {System.out.println("topic:" + topic);client.subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 發布,非持久化**? qos根據文檔設置為1** @param topic* @param msg*/public static void publish(String topic, String msg) {publish(1, false, topic, msg);}/*** 發布*/public static void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = client.getTopic(topic);if (null == mTopic) {System.out.println("topic:" + topic + " 不存在");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();if (!token.isComplete()) {System.out.println("消息發送成功");}} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}} }

在上面的啟動類中的run方法中會首先連接mqtt服務器并創建客戶端,然后加載配置文件中配置的默認主題并調用create

進行訂閱。

這其中也提供了單個的用于訂閱主題和發布消息的方法。

其中在訂閱主題后接收消息時需要一個回調方法。

?

所以需要新建一個實現了MqttCallbackExtended接口的相關方法的回調處理類MqttConsumerCallback

import org.eclipse.paho.client.mqttv3.*;import java.util.Arrays;/*** mqtt回調處理類*/public class MqttConsumerCallback implements MqttCallbackExtended {private MqttClient client;private MqttConnectOptions options;private String[] topic;private int[] qos;public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {this.client = client;this.options = options;this.topic = topic;this.qos = qos;}/*** 斷開重連*/@Overridepublic void connectionLost(Throwable cause) {System.out.println("MQTT連接斷開,發起重連......");try {if (null != client && !client.isConnected()) {client.reconnect();System.out.println("嘗試重新連接");} else {client.connect(options);System.out.println("嘗試建立新連接");}} catch (Exception e) {e.printStackTrace();}}/*** 接收到消息調用令牌中調用*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + Arrays.toString(topic));}/*** 消息處理*/@Overridepublic void messageArrived(String topic, MqttMessage message) {try {String msg = new String(message.getPayload());System.out.println("收到topic:" + topic + " 消息:" + msg);System.out.println("收到消息后執行具體的業務邏輯操作,比如將消息存儲進數據庫");} catch (Exception e) {System.out.println("處理mqtt消息異常:" + e);}}/*** mqtt連接后訂閱主題*/@Overridepublic void connectComplete(boolean b, String s) {try {if (null != topic && null != qos) {if (client.isConnected()) {client.subscribe(topic, qos);System.out.println("mqtt連接成功,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);System.out.println("--訂閱主題::" + Arrays.toString(topic));} else {System.out.println("mqtt連接失敗,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);}}} catch (Exception e) {System.out.println("mqtt訂閱主題異常:" + e);}} }

以上三個類建立成功之后就可以進行訂閱主題和發布消息的測試了。

發布指定主題的消息

新建一個測試用的Controller接口用來測試推送消息

import com.ruoyi.web.mqtt.MqttConsumer; import org.springframework.data.repository.query.Param; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping("/testmqtt") public class MqttTest {/*** 測試推送消息*/@ResponseBody@GetMapping(value = "/push")public Object push(@Param("topic") String topic,@Param("msg") String msg) {MqttConsumer.publish(topic, msg);return "測試發布主題成功";} }

并且把這個接口url放開權限驗證可以直接在postman中進行接口調用

這里直接在接口方法中調用了上面的推送消息的方法,第一個參數是指定的主題,第二個參數是消息的內容

那么此時SpringBoot就充當了發布者的角色。

在測試推送消息前需要使用MqttBox連接到同一個MQTT服務器并訂閱同一個主題,這里訂閱badao主題

我們調用一下此接口

?

然后此時查看下MqttBox中已經收到消息

?

SpringBoot訂閱主題并接收消息

在上面的接口中再添加一個接口用來訂閱某個指定主題

??? /*** 測試接收消息*/@ResponseBody@GetMapping(value = "/subscribe")public Object subscribe(@Param("topic") String topic,@Param("qus") int qus) {MqttConsumer.subscribe(topic, qus);return "訂閱主題"+topic+"成功";}

這里訂閱主題的方法第一個參數是主題,第二個是消息質量

然后再調用下此接口

?

可以看到訂閱主題成功,然后我們使用MqttBox去發布一個同樣主題的消息,那么SpringBoot這邊的回調方法就可以接收到發送的消息并進行后續的業務操作,

比如將消息存儲到數據庫等。

?

總結

以上是生活随笔為你收集整理的SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。