Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件
場(chǎng)景
MQTT
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的"輕量級(jí)"通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作為一種低開(kāi)銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用。
MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡(jiǎn)單、開(kāi)放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在,通過(guò)衛(wèi)星鏈路通信傳感器、偶爾撥號(hào)的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用。
MQTTnet
MQTTnet 是一個(gè)基于 MQTT 通信的高性能 .NET 開(kāi)源庫(kù),它同時(shí)支持 MQTT 服務(wù)器端和客戶端。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關(guān)注公眾號(hào)
霸道的程序猿
獲取編程相關(guān)電子書(shū)、教程推送與免費(fèi)下載。
實(shí)現(xiàn)
打開(kāi)VS,新建mqtt的服務(wù)端項(xiàng)目
選擇將解決方案和項(xiàng)目放在同一個(gè)目錄下
然后選擇新建控制臺(tái)應(yīng)用程序,并且選擇目標(biāo)框架為.NET Core3.1
然后在此解決方案下右擊新建項(xiàng)目
新建Winform項(xiàng)目,作為Mqtt的客戶端
然后在解決方案上右擊選擇-管理解決方案的Nuget 程序包
然后在瀏覽中搜索MQTTnet,右邊勾選需要添加依賴的項(xiàng)目,這里客戶端和服務(wù)端都需要添加。然后下面選擇指定版本,這里是2.4.0,點(diǎn)擊安裝
安裝過(guò)程中會(huì)提示預(yù)覽更改,點(diǎn)擊確定,以及會(huì)提示接受協(xié)議。
安裝成功后,就可以在項(xiàng)目下看到依賴項(xiàng)
然后編寫服務(wù)端的代碼,打開(kāi)Program.cs,修改如下
using System; using MQTTnet; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; using System.Text; using System.Threading;namespace MqttnetServer {class Program{//MQtt服務(wù)端private static MqttServer mqttServer = null;static void Main(string[] args){//MQTTnet 提供了一個(gè)靜態(tài)類 MqttNetTrace 來(lái)對(duì)消息進(jìn)行跟蹤//MqttNetTrace 的事件 TraceMessagePublished 用于跟蹤服務(wù)端和客戶端應(yīng)用的日志消息,比如啟動(dòng)、停止、心跳、消息訂閱和發(fā)布等MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;//啟動(dòng)服務(wù)端new Thread(StartMqttServer).Start();while (true){//獲取輸入字符var inputString = Console.ReadLine().ToLower().Trim();//exit則停止服務(wù)if (inputString == "exit"){mqttServer.StopAsync();Console.WriteLine("MQTT服務(wù)已停止!");break;}//clients則輸出所有客戶端else if (inputString == "clients"){foreach (var item in mqttServer.GetConnectedClients()){Console.WriteLine($"客戶端標(biāo)識(shí):{item.ClientId},協(xié)議版本:{item.ProtocolVersion}");}}else{Console.WriteLine($"命令[{inputString}]無(wú)效!");}}}//啟動(dòng)服務(wù)端private static void StartMqttServer(){if (mqttServer == null){try{//在 MqttServerOptions 選項(xiàng)中,你可以使用 ConnectionValidator 來(lái)對(duì)客戶端連接進(jìn)行驗(yàn)證。//比如客戶端ID標(biāo)識(shí) ClientId,用戶名 Username 和密碼 Password 等。var options = new MqttServerOptions{ConnectionValidator = p =>{if (p.ClientId == "c001"){if (p.Username != "u001" || p.Password != "p001"){return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;}}return MqttConnectReturnCode.ConnectionAccepted;}};//創(chuàng)建服務(wù)端最簡(jiǎn)單的方式是采用 MqttServerFactory 對(duì)象的 CreateMqttServer 方法來(lái)實(shí)現(xiàn),該方法需要一個(gè)//MqttServerOptions 參數(shù)。mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer;//服務(wù)端支持 ClientConnected、ClientDisconnected 和 ApplicationMessageReceived 事件,//分別用來(lái)檢查客戶端連接、客戶端斷開(kāi)以及接收客戶端發(fā)來(lái)的消息。//ApplicationMessageReceived 的事件參數(shù)包含了客戶端ID標(biāo)識(shí) ClientId 和 MQTT 應(yīng)用消息 MqttApplicationMessage 對(duì)象,//通過(guò)該對(duì)象可以獲取主題 Topic、QoS QualityOfServiceLevel 和消息內(nèi)容 Payload 等信息mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;//ClientConnected 和 ClientDisconnected 事件的事件參數(shù)一個(gè)客戶端連接對(duì)象 ConnectedMqttClient//通過(guò)該對(duì)象可以獲取客戶端ID標(biāo)識(shí) ClientId 和 MQTT 版本 ProtocolVersion。mqttServer.ClientConnected += MqttServer_ClientConnected;mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;}catch (Exception ex){Console.WriteLine(ex.Message); return;}}//創(chuàng)建了一個(gè) IMqttServer 對(duì)象后,調(diào)用其 StartAsync 方法即可啟動(dòng) MQTT 服務(wù)mqttServer.StartAsync();Console.WriteLine("MQTT服務(wù)啟動(dòng)成功!");}//ClientConnected 和 ClientDisconnected 事件的事件參數(shù)一個(gè)客戶端連接對(duì)象 ConnectedMqttClient//通過(guò)該對(duì)象可以獲取客戶端ID標(biāo)識(shí) ClientId 和 MQTT 版本 ProtocolVersion。private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e){Console.WriteLine($"客戶端[{e.Client.ClientId}]已連接,協(xié)議版本:{e.Client.ProtocolVersion}");}//ClientConnected 和 ClientDisconnected 事件的事件參數(shù)一個(gè)客戶端連接對(duì)象 ConnectedMqttClient//通過(guò)該對(duì)象可以獲取客戶端ID標(biāo)識(shí) ClientId 和 MQTT 版本 ProtocolVersion。private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e){Console.WriteLine($"客戶端[{e.Client.ClientId}]已斷開(kāi)連接!");}//ApplicationMessageReceived 的事件參數(shù)包含了客戶端ID標(biāo)識(shí) ClientId 和 MQTT 應(yīng)用消息 MqttApplicationMessage 對(duì)象,//通過(guò)該對(duì)象可以獲取主題 Topic、QoS QualityOfServiceLevel 和消息內(nèi)容 Payload 等信息private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e){Console.WriteLine($"客戶端[{e.ClientId}]>> 主題:{e.ApplicationMessage.Topic} 負(fù)荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}");}//事件參數(shù) MqttNetTraceMessagePublishedEventArgs 包含了線程ID ThreadId、來(lái)源 Source、日志級(jí)別 Level、日志消息 Message、異常信息 Exception 等。//MqttNetTrace 類還提供了4個(gè)不同消息等級(jí)的靜態(tài)方法,Verbose、Information、Warning 和 Error,//用于給出不同級(jí)別的日志消息,該消息將會(huì)在 TraceMessagePublished 事件中輸出,//你可以使用 e.Level 進(jìn)行過(guò)慮。private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e){Console.WriteLine($">> 線程ID:{e.ThreadId} 來(lái)源:{e.Source} 跟蹤級(jí)別:{e.Level} 消息: {e.Message}"); if (e.Exception != null) { Console.WriteLine(e.Exception); }}} }然后啟動(dòng)服務(wù)端
然后修改客戶端的窗體頁(yè)面,添加如下控件布局
連接按鈕的點(diǎn)擊事件代碼為
??????? //連接到Mqtt服務(wù)器private void button_Connect_Click(object sender, EventArgs e){if (string.IsNullOrEmpty(textServerAddress.Text)){MessageBox.Show("服務(wù)器地址不能為空");}else {Task.Run(async () => {await ConnectMqttServerAsync();});}}訂閱并保存到文件按鈕的點(diǎn)擊事件為
?????? //訂閱主題按鈕點(diǎn)擊事件private void btnSubscribe_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("訂閱主題不能為空!");return;}if (!mqttClient.IsConnected){MessageBox.Show("MQTT客戶端尚未連接!");return;}//客戶端連接到服務(wù)端之后,可以使用 SubscribeAsync 異步方法訂閱消息,//該方法可以傳入一個(gè)可枚舉或可變參數(shù)的主題過(guò)濾器 TopicFilter 參數(shù),主題過(guò)濾器包含主題名和 QoS 等級(jí)。mqttClient.SubscribeAsync(new List<TopicFilter> {new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)});txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);}取消訂閱按鈕的點(diǎn)擊事件為
??????? //取消訂閱按鈕點(diǎn)擊事件private void btn_cancle_sub_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();mqttClient.UnsubscribeAsync(new List<String> {topic});}發(fā)布按鈕的點(diǎn)擊事件為
??????? //發(fā)布主題private void button2_Click_1(object sender, EventArgs e){string topic = txtPubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("發(fā)布主題不能為空!"); return;}string inputString = txtSendMessage.Text.Trim();//發(fā)布消息前需要先構(gòu)建一個(gè)消息對(duì)象 MqttApplicationMessage,//最直接的方法是使用其實(shí)構(gòu)造函數(shù),傳入主題、內(nèi)容、Qos 等參數(shù)。var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);//得到 MqttApplicationMessage 消息對(duì)象后,//通過(guò)客戶端對(duì)象調(diào)用其 PublishAsync 異步方法進(jìn)行消息發(fā)布。mqttClient.PublishAsync(appMsg);}完整客戶端示例代碼為
using MQTTnet; using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading.Tasks; using System.Windows.Forms;namespace MqttnetClient {public partial class Form1 : Form{//MQTTk客戶端private MqttClient mqttClient = null;public Form1(){InitializeComponent();}private async Task ConnectMqttServerAsync(){if (mqttClient == null){//使用 MQTTnet 創(chuàng)建 MQTT 也非常簡(jiǎn)單//只需要使用 MqttClientFactory 對(duì)象的 CreateMqttClient 方法即可mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;//客戶端支持 Connected、Disconnected 和 ApplicationMessageReceived 事件,//用來(lái)處理客戶端與服務(wù)端連接、客戶端從服務(wù)端斷開(kāi)以及客戶端收到消息的事情。mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;mqttClient.Connected += MqttClient_Connected;mqttClient.Disconnected += MqttClient_Disconnected;}try{//調(diào)用ConnectAsync方法時(shí)需要傳遞一個(gè) MqttClientTcpOptions 對(duì)象//選項(xiàng)包含了客戶端ID標(biāo)識(shí) ClientId、服務(wù)端地址(可以使用IP地址或域名)Server、端口號(hào) Port、//用戶名 UserName、密碼 Password 等信息。var options = new MqttClientTcpOptions { };if (!string.IsNullOrEmpty(textPort.Text)){options = new MqttClientTcpOptions{Server = textServerAddress.Text,ClientId = Guid.NewGuid().ToString().Substring(0, 5),UserName = textusername.Text,Password = textpassword.Text,Port = int.Parse(textPort.Text),CleanSession = true};}else {options = new MqttClientTcpOptions{Server = textServerAddress.Text,ClientId = Guid.NewGuid().ToString().Substring(0, 5),UserName = textusername.Text,Password = textpassword.Text,CleanSession = true};}//創(chuàng)建客戶端對(duì)象后,調(diào)用其異步方法 ConnectAsync 來(lái)連接到服務(wù)端。await mqttClient.ConnectAsync(options);}catch (Exception ex){Invoke((new Action(() =>{txtReceiveMessage.AppendText($"連接到MQTT服務(wù)器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine);})));}}private void MqttClient_Connected(object sender, EventArgs e){Invoke((new Action(() =>{txtReceiveMessage.AppendText("已連接到MQTT服務(wù)器!" + Environment.NewLine);})));}private void MqttClient_Disconnected(object sender, EventArgs e){Invoke((new Action(() =>{txtReceiveMessage.AppendText("已斷開(kāi)MQTT連接!" + Environment.NewLine);})));}private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e){Invoke((new Action(() =>{txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");//將收到的消息追加到相對(duì)路徑record.txt文件if (!File.Exists("record.txt")){File.Create("record.txt");}else{File.AppendAllText("record.txt", "\r\n");File.AppendAllText("record.txt", Encoding.Default.GetString(e.ApplicationMessage.Payload));}})));}//訂閱主題按鈕點(diǎn)擊事件private void btnSubscribe_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("訂閱主題不能為空!");return;}if (!mqttClient.IsConnected){MessageBox.Show("MQTT客戶端尚未連接!");return;}//客戶端連接到服務(wù)端之后,可以使用 SubscribeAsync 異步方法訂閱消息,//該方法可以傳入一個(gè)可枚舉或可變參數(shù)的主題過(guò)濾器 TopicFilter 參數(shù),主題過(guò)濾器包含主題名和 QoS 等級(jí)。mqttClient.SubscribeAsync(new List<TopicFilter> {new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)});txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);}//發(fā)布主題private void button2_Click_1(object sender, EventArgs e){string topic = txtPubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("發(fā)布主題不能為空!"); return;}string inputString = txtSendMessage.Text.Trim();//發(fā)布消息前需要先構(gòu)建一個(gè)消息對(duì)象 MqttApplicationMessage,//最直接的方法是使用其實(shí)構(gòu)造函數(shù),傳入主題、內(nèi)容、Qos 等參數(shù)。var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);//得到 MqttApplicationMessage 消息對(duì)象后,//通過(guò)客戶端對(duì)象調(diào)用其 PublishAsync 異步方法進(jìn)行消息發(fā)布。mqttClient.PublishAsync(appMsg);}//連接到Mqtt服務(wù)器private void button_Connect_Click(object sender, EventArgs e){if (string.IsNullOrEmpty(textServerAddress.Text)){MessageBox.Show("服務(wù)器地址不能為空");}else {Task.Run(async () => {await ConnectMqttServerAsync();});}}//取消訂閱按鈕點(diǎn)擊事件private void btn_cancle_sub_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();mqttClient.UnsubscribeAsync(new List<String> {topic});}} }運(yùn)行客戶端
這里服務(wù)器地址就是本機(jī)地址,端口或者用戶名和密碼根據(jù)自己需要
然后就可以進(jìn)行主題的訂閱和發(fā)布了
然后在客戶端bin目錄下找到record.txt,可以看到追加到文件成功。
示例代碼下載
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/19431291
參考文章:
https://www.cnblogs.com/kuige/articles/7724786.html
總結(jié)
以上是生活随笔為你收集整理的Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: VS中使用NuGet安装依赖时提示:无法
- 下一篇: Node-RED订阅MQTT主题并调试数