反应式系统实现MQTT客户机
反應(yīng)式系統(tǒng)實(shí)現(xiàn)MQTT客戶機(jī)
Implementing an MQTT client for reactive systems
MQTT Reactive是從LiamBindle的MQTT-C庫派生的MQTT v3.1.1客戶機(jī)。MQTT-Reactive的目的是提供一個(gè)用C語言編寫的可移植、無阻塞的MQTT客戶機(jī),以便在反應(yīng)式嵌入式系統(tǒng)中使用。首先,本文解釋了什么是反應(yīng)式系統(tǒng)。然后,介紹了如何設(shè)計(jì)適合該類系統(tǒng)的軟件結(jié)構(gòu)。最后,本文展示了如何通過使用狀態(tài)機(jī)和事件驅(qū)動(dòng)范式在反應(yīng)式系統(tǒng)中使用MQTT反應(yīng)式庫。為了做到這一點(diǎn),本文以一個(gè)實(shí)際的物聯(lián)網(wǎng)設(shè)備為例,通過使用UML圖(如狀態(tài)機(jī)、交互和結(jié)構(gòu))來解釋其軟件結(jié)構(gòu)和基于狀態(tài)的行為。本文還提供了用C語言實(shí)現(xiàn)IoT設(shè)備的MQTT反應(yīng)式客戶端的指南。
許多嵌入式系統(tǒng)是被動(dòng)的,即它們對(duì)內(nèi)部或外部事件做出反應(yīng)。一旦這些反應(yīng)完成,軟件就會(huì)返回等待下一個(gè)事件。這就是為什么事件驅(qū)動(dòng)系統(tǒng)被稱為反應(yīng)系統(tǒng)。
事件驅(qū)動(dòng)編程,或稱反應(yīng)式編程,是實(shí)現(xiàn)反應(yīng)式系統(tǒng)靈活、可預(yù)測(cè)和可維護(hù)軟件的最合適的編程范式之一。在這個(gè)范例中,程序的流程是由事件決定的。通常,反應(yīng)式軟件的結(jié)構(gòu)是由多個(gè)并發(fā)單元(稱為活動(dòng)對(duì)象)組成,這些單元等待并處理不同類型的事件。每個(gè)活動(dòng)對(duì)象都擁有一個(gè)控制線程和一個(gè)事件隊(duì)列,通過它來處理傳入的事件。在反應(yīng)式系統(tǒng)中,活動(dòng)對(duì)象通常具有狀態(tài)圖中定義的基于狀態(tài)的行為。
為了探索如何在具有多個(gè)并發(fā)任務(wù)的反應(yīng)式系統(tǒng)中使用MQTT反應(yīng)庫,同時(shí)使用狀態(tài)機(jī)和事件驅(qū)動(dòng)范式,我們以物聯(lián)網(wǎng)設(shè)備為例。
使用MQTT協(xié)議的想法是在一家鐵路公司開發(fā)物聯(lián)網(wǎng)設(shè)備時(shí)產(chǎn)生的。該裝置是一個(gè)清晰的反應(yīng)系統(tǒng),能夠:
檢測(cè)并存儲(chǔ)多個(gè)數(shù)字輸入的變化
采集、濾波和存儲(chǔ)多個(gè)模擬信號(hào)
定期將存儲(chǔ)的信息發(fā)送到遠(yuǎn)程服務(wù)器
在GSM網(wǎng)絡(luò)上通過MQTT協(xié)議發(fā)送和接收信息
之所以選擇MQTT,是因?yàn)樗且环N基于發(fā)布者和訂戶的輕量級(jí)消息傳遞協(xié)議,通常用于物聯(lián)網(wǎng)和網(wǎng)絡(luò)應(yīng)用中,在這些應(yīng)用中,預(yù)期會(huì)出現(xiàn)高延遲和低數(shù)據(jù)速率鏈路,例如GSM網(wǎng)絡(luò)。
上述物聯(lián)網(wǎng)設(shè)備的MQTT功能是通過使用LiamBindle的MQTT-C的修改版本實(shí)現(xiàn)的。由于該設(shè)備的軟件設(shè)計(jì)為反應(yīng)式軟件,因此必須修改MQTT-C,以便通過交換異步事件與系統(tǒng)的其余部分進(jìn)行通信。這些事件用于通過網(wǎng)絡(luò)接收和發(fā)送流量,以及連接敏感信息并將其發(fā)布到服務(wù)器。生成的軟件庫稱為MQTT Reactive。
狀態(tài)機(jī)
MQTT Reactive是通過一個(gè)狀態(tài)機(jī)使用的,如圖1所示,該狀態(tài)機(jī)為MQTT反應(yīng)式客戶機(jī)的基本行為建模。它是一個(gè)名為MqttMgr(MQTT管理器)的活動(dòng)對(duì)象。圖1中的狀態(tài)機(jī)操作演示了如何從狀態(tài)機(jī)使用MQTT反應(yīng)庫。即使在圖1中使用C語言作為動(dòng)作語言,也可以使用任何計(jì)算機(jī)或形式語言。
Figure 1. State machine of an MQTT-Reactive client
圖1中的狀態(tài)機(jī)以WaitingForNetConnection狀態(tài)啟動(dòng)。當(dāng)網(wǎng)絡(luò)連接建立到等待狀態(tài)后,服務(wù)器將接收到等待連接狀態(tài)。只有在這種狀態(tài)下,狀態(tài)機(jī)才能分別通過CONNECT和PUBLISH事件將MQTT消息發(fā)送到代理,例如CONNECT或PUBLISH。同步狀態(tài)使用UML的特殊機(jī)制來延遲發(fā)布事件,發(fā)布事件由同步狀態(tài)的內(nèi)部分區(qū)中包含的defer關(guān)鍵字指定。如果發(fā)布事件在Sync為當(dāng)前狀態(tài)時(shí)發(fā)生,則它將被保存(延遲)以供將來處理,直到SM進(jìn)入發(fā)布事件不在其延遲事件列表(例如WaitingForSync或WaitingForNetConnection)中的狀態(tài)。進(jìn)入這些狀態(tài)后,狀態(tài)機(jī)將自動(dòng)調(diào)用任何保存的發(fā)布事件,然后根據(jù)轉(zhuǎn)換目標(biāo)狀態(tài)使用或丟棄此事件。
每同步一毫秒,狀態(tài)機(jī)就轉(zhuǎn)換到Sync composite狀態(tài),該狀態(tài)通過向網(wǎng)絡(luò)管理器發(fā)布接收和發(fā)送事件來實(shí)際發(fā)送和接收來自網(wǎng)絡(luò)的流量。它是一個(gè)處理網(wǎng)絡(luò)問題的并發(fā)實(shí)體。
盡管引入的MqttMgr只支持CONNECT和PUBLISH包,但它可以通過相當(dāng)簡(jiǎn)單的更改來支持SUBSCRIBE包。
狀態(tài)機(jī)操作使用params關(guān)鍵字訪問已消費(fèi)事件的參數(shù)。例如,在以下轉(zhuǎn)換中,Connect事件攜帶兩個(gè)參數(shù)clientId和keepAlive,它們的值用于更新相應(yīng)的MqttMgr對(duì)象的屬性:
Connect(clientId, keepAlive)/
me->clientId = params->clientId;
me->keepAlive = params->keepAlive;
me->operRes = mqtt_connect(&me->client, me->clientId, NULL, NULL, 0,
NULL, NULL, 0, me->keepAlive);
在本例中,Connect(clientId,keepAlive)事件是轉(zhuǎn)換的觸發(fā)器,mqtt_Connect()調(diào)用是作為結(jié)果執(zhí)行的操作的一部分。換句話說,當(dāng)MqttMgr對(duì)象接收到參數(shù)為“publishing_client”和“400”,Connect(“publishing_client”,400)的Connect(clientId,keepAlive)事件時(shí),MqttMgr的clientId和keepAlive屬性會(huì)相應(yīng)地更新為值“publishing\u client”和“400”。
為了創(chuàng)建和發(fā)送事件,狀態(tài)機(jī)的操作使用GEN()宏。例如,以下語句將接收事件發(fā)送到收集器對(duì)象,該對(duì)象被其收集器指針引用為MqttMgr對(duì)象的屬性:
GEN(me->itsCollector, Receive());
GEN()語句的第一個(gè)參數(shù)是接收事件的對(duì)象,而第二個(gè)參數(shù)是要發(fā)送的事件,包括事件參數(shù)(如果有)。參數(shù)必須與事件參數(shù)一致。例如,以下語句生成一個(gè)ConnRefused(code)事件,并將其發(fā)送到收集器對(duì)象,將代理返回的代碼作為事件參數(shù)傳遞:
GEN(me->itsCollector, ConRefused(code));
使用params關(guān)鍵字訪問所消費(fèi)事件的參數(shù)和GEN()宏從操作中生成事件的想法是從Rational Rhapsody Developer’s code開發(fā)人員的代碼生成器中純粹出于說明目的而采用的。
圖1中狀態(tài)機(jī)的默認(rèn)操作設(shè)置每當(dāng)從代理接收到連接接受時(shí)MQTT Reactive調(diào)用的回調(diào)。此回調(diào)應(yīng)在MqttMgr代碼中實(shí)現(xiàn)。此回調(diào)必須生成ConnAccepted or ConnRefused(code)事件,以便發(fā)送到收集器對(duì)象,如下所示。
static voidconnack_response_callback(enum MQTTConnackReturnCode return_code)
{ /…/ if (return_code == MQTT_CONNACK_ACCEPTED)
{ GEN(me->itsCollector, ConnAccepted()); }
else
{ GEN(me->itsCollector, ConnRefused(return_code)); }}
模型實(shí)施
圖1中的模型可以用C或C++實(shí)現(xiàn),或者使用您最喜歡的軟件工具,或者只使用自己的狀態(tài)機(jī)實(shí)現(xiàn)。在因特網(wǎng)上有不同的工具可以實(shí)現(xiàn)這一點(diǎn),例如RKH框架、QP框架、Yakindu Statechart工具或RationalRhapsody Developer等等。它們都支持狀態(tài)圖和C/C++語言。此外,其中一些還包括繪制狀態(tài)圖并從中生成代碼的工具。
此狀態(tài)機(jī)是從名為MqttMgr(MQTT管理器)的活動(dòng)對(duì)象執(zhí)行的,它提供了MQTT反應(yīng)式代碼的嚴(yán)格封裝,并且它是唯一允許調(diào)用任何MQTT反應(yīng)式函數(shù)或訪問MQTT反應(yīng)式數(shù)據(jù)的實(shí)體。系統(tǒng)中的其他并發(fā)實(shí)體以及任何isr只能通過與MqttMgr交換事件來間接使用MQTT Reactive。使用這種機(jī)制來同步并發(fā)實(shí)體并在它們之間共享數(shù)據(jù)避免了傳統(tǒng)阻塞機(jī)制(如信號(hào)量、互斥體、延遲或事件標(biāo)志)的危險(xiǎn)。這些機(jī)制可能會(huì)導(dǎo)致難以診斷和修復(fù)的意外故障。
MqttMgr活動(dòng)對(duì)象將其屬性封裝為一組數(shù)據(jù)項(xiàng)。數(shù)據(jù)項(xiàng)用名稱和類型指定變量,其中類型實(shí)際上是數(shù)據(jù)類型。MqttMgr對(duì)象的數(shù)據(jù)項(xiàng)映射到對(duì)象結(jié)構(gòu)的成員。成員的名稱和類型與對(duì)象數(shù)據(jù)的名稱和類型相同。例如,MqttMgr對(duì)象類型的client屬性作為數(shù)據(jù)成員嵌入到MqttMgr結(jié)構(gòu)中:
struct MqttMgr { /* … / struct mqtt_client client; / attribute client / LocalRecvAll localRecv; / attribute localRecv */};
直接訪問和修改MqttMgr對(duì)象的數(shù)據(jù),而不使用訪問器或賦值器操作。例如,客戶機(jī)和localRecv通過me指針訪問,該指針指向MqttMgr的實(shí)例。
mqtt_recvMsgError(&me->client, &me->localRecv);
MqttMgr具有表1中顯示的屬性列表。
圖2中的結(jié)構(gòu)有助于記住相關(guān)參與者之間的關(guān)系。它們是:Collector對(duì)象,它希望向代理發(fā)送信息;NetMgr對(duì)象,它處理網(wǎng)絡(luò);以及MqttMgr對(duì)象。
Figure 2. Draft of IoT system structure
圖3中的序列圖顯示了當(dāng)需要MqttMgr對(duì)象打開與MQTT服務(wù)器的會(huì)話時(shí),它是如何與系統(tǒng)其余部分交互的。在此圖中,MqttMgr狀態(tài)和交換的異步消息顯示在收集器、MqttMgr和NetMgr參與者之間。
NetMgr對(duì)象建立到代理的網(wǎng)絡(luò)連接后,從MqttMgr發(fā)送到MQTT服務(wù)器的第一個(gè)數(shù)據(jù)包必須是CONNECT數(shù)據(jù)包。因此,收集器actor向MqttMgr
actor發(fā)送一個(gè)Connect(clientId,keepAlive)事件。此事件必須攜帶客戶端標(biāo)識(shí)符和保持活動(dòng)時(shí)間間隔。如果服務(wù)器接受連接請(qǐng)求,MqttMgr actor將向收集器actor發(fā)送ConnAccepted事件以通知此情況。從那時(shí)起,收集器參與者可以向該代理發(fā)布信息消息。
如果服務(wù)器拒絕連接請(qǐng)求,MqttMgr actor將向收集器actor發(fā)送一個(gè)conndrekend事件。此事件附帶一個(gè)代碼,用于通知拒絕原因,如圖4所示。
Figure 4. Broker rejects a connection request
圖5顯示了消息發(fā)布時(shí)的交互流。為了做到這一點(diǎn),收集器參與者發(fā)送一個(gè)發(fā)布(data,size,topic,qos)事件,該事件攜帶要發(fā)布的信息(data)、信息的長(zhǎng)度(以字節(jié)為單位)、信息將被發(fā)布到的主題名稱(topic)以及傳遞該消息的保證級(jí)別(qos)。在前面提到的IoT設(shè)備中,發(fā)布的信息是使用JSON規(guī)范格式化的。它是一種開放的標(biāo)準(zhǔn)格式,在人類可讀文本中包含具有屬性值對(duì)的數(shù)據(jù)對(duì)象。這種格式是使用jWrite實(shí)現(xiàn)的,jWrite是一個(gè)用C編寫的簡(jiǎn)單而輕量級(jí)的庫。
Figure 5. Publishing data to a broker
圖6顯示了一個(gè)場(chǎng)景,其中MQTT消息的接收和發(fā)送失敗。如果網(wǎng)絡(luò)管理器無法從網(wǎng)絡(luò)接收流量,它將向MqttMgr actor發(fā)送ReceiveFail。類似地,如果網(wǎng)絡(luò)管理器不能向網(wǎng)絡(luò)發(fā)送數(shù)據(jù),它將向MqttMgr actor發(fā)送SendFail。
Figure 6. Failures in network
表2總結(jié)了所示場(chǎng)景中涉及的事件。
結(jié)論
通過避免傳統(tǒng)阻塞機(jī)制(如信號(hào)量、互斥、延遲或事件標(biāo)志)的危險(xiǎn),本文提出的MQTT反應(yīng)庫、狀態(tài)機(jī)和軟件體系結(jié)構(gòu)允許反應(yīng)式嵌入式系統(tǒng)以新穎的方式實(shí)現(xiàn)MQTT客戶機(jī)。它是通過將MQTT反應(yīng)式代碼封裝在稱為active object的并發(fā)單元中實(shí)現(xiàn)的,active object基于狀態(tài)的行為在建議的狀態(tài)機(jī)中定義。此活動(dòng)對(duì)象通過交換異步事件與系統(tǒng)的其余部分進(jìn)行通信:不僅用于在網(wǎng)絡(luò)上接收和發(fā)送流量,還用于將信息連接并發(fā)布到用于物聯(lián)網(wǎng)應(yīng)用程序的服務(wù)器。
總結(jié)
以上是生活随笔為你收集整理的反应式系统实现MQTT客户机的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 为什么您应该使用基于标准的开发实践
- 下一篇: 从C到C++过渡的3个原因