ROS话题模式及多线程
文章介紹ROS通訊中基于發(fā)布/訂閱模型的話題模式,默認(rèn)情況下的消息發(fā)送和接收處理,以及如何引入多線程來提高程序性能。
通信機(jī)制
ROS常用的通信機(jī)制包含話題發(fā)布/訂閱模型的話題模式與服務(wù)器/客戶端的服務(wù)模式,兩種模式的差異如下表
| 通信模型 | 發(fā)布/訂閱 | 客戶端/服務(wù)器 |
| 反饋機(jī)制 | 無 | 有 |
| 底層協(xié)議 | ROSTCP/ROSUDP | ROSTCP/ROSUDP |
| 緩沖區(qū) | 有 | 無 |
| 實(shí)時(shí)性 | 弱 | 強(qiáng) |
| 節(jié)點(diǎn)關(guān)系 | 多對(duì)多 | 一對(duì)多(一個(gè)Server) |
| 使用場(chǎng)景 | 弱邏輯處理,多數(shù)據(jù)傳輸 | 強(qiáng)邏輯處理,少數(shù)據(jù)傳輸 |
| 此處會(huì)針對(duì)話題模式做一些原理介紹與其多線程的應(yīng)用。下面是ROS中一些基本的中英文技術(shù)詞匯的對(duì)照: |
| 英文 | message | topic | node | publisher | subscriber | callback |
話題模式
話題模式是有多個(gè)發(fā)布者與多個(gè)訂閱者參與的異步通信模式,他們通過節(jié)點(diǎn)管理器(ros master)注冊(cè)信息,建立連接和通信。
同一節(jié)點(diǎn)的所有發(fā)布者發(fā)布的消息被存儲(chǔ)在全局的消息隊(duì)列中,根據(jù)時(shí)間戳進(jìn)行取舍。所有的訂閱者被所在節(jié)點(diǎn)調(diào)用時(shí)(e.g. ros::spin())去消息隊(duì)列中取得訂閱話題的消息并執(zhí)行回調(diào)函數(shù)。
此處可以理解為生產(chǎn)者消費(fèi)者模型,發(fā)布者負(fù)責(zé)生產(chǎn)消息存入消息隊(duì)列中,訂閱者負(fù)責(zé)從消息隊(duì)列取出消息并消費(fèi)(回調(diào)函數(shù)),這里通過消息隊(duì)列作為數(shù)據(jù)的緩存區(qū)來解耦發(fā)布者與訂閱者。
跟經(jīng)典的生產(chǎn)消費(fèi)模型不同的是,同一節(jié)點(diǎn)的發(fā)布者與訂閱者都有自己專門的消息隊(duì)列去存儲(chǔ)消息,主要是因?yàn)榘l(fā)布者是基于話題發(fā)送,而不是直接向訂閱者發(fā)送,所以必須要有一個(gè)消息隊(duì)列來存放發(fā)布的消息,以供訂閱者來獲取。而且這個(gè)消息隊(duì)列的好處是在網(wǎng)絡(luò)差、帶寬小、延時(shí)高的時(shí)候,保證數(shù)據(jù)不容易丟失。因?yàn)榘l(fā)布者與訂閱者不一定在同一臺(tái)主機(jī)上,因此消息需要通過網(wǎng)絡(luò)來交換。但是網(wǎng)絡(luò)的性能時(shí)好時(shí)壞,如果訂閱者沒有消息隊(duì)列,那么每次運(yùn)行回調(diào)函數(shù)前都要先通過網(wǎng)絡(luò)取回消息,然后才能處理。當(dāng)網(wǎng)絡(luò)很差時(shí),就會(huì)讓系統(tǒng)堵塞。而有消息隊(duì)列的話,訂閱者就可以一邊處理隊(duì)列中的消息,一邊通過網(wǎng)絡(luò)緩存新的消息,而不用每次處理消息前都要臨時(shí)去讀一個(gè)回來。這樣就增加了系統(tǒng)的可靠性。
這里會(huì)引入的一個(gè)問題,同一話題的消息在發(fā)布消息隊(duì)列發(fā)送到訂閱消息隊(duì)列的時(shí)候會(huì)產(chǎn)生拷貝,造成內(nèi)存浪費(fèi)。針對(duì)發(fā)布節(jié)點(diǎn)與訂閱節(jié)點(diǎn)在同一臺(tái)機(jī)器的情況,ROS引入了nodelet來避免這個(gè)開銷。
發(fā)布者的消息隊(duì)列
節(jié)點(diǎn)中的全局消息隊(duì)列會(huì)將所有發(fā)布者的消息存儲(chǔ)起來,此處的消息隊(duì)列可以理解為一個(gè)線程安全,根據(jù)時(shí)間戳排序的消息存儲(chǔ)器,如:
消息隊(duì)列的長(zhǎng)度,即一個(gè)消息隊(duì)列中能存儲(chǔ)多少條消息是由創(chuàng)建發(fā)布者的時(shí)候定義,如
// chatter_pub為發(fā)布者對(duì)象實(shí)例,std_msgs::String為消息類型,“chatter”為話題名稱,1000為話題的消息隊(duì)列長(zhǎng)度 ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);此處的1000即為消息隊(duì)列中會(huì)緩存多少條信息。 當(dāng)同一話題所有發(fā)布者生產(chǎn)的頻率大于對(duì)應(yīng)訂閱者消費(fèi)的頻率時(shí),發(fā)布的新消息存入消息隊(duì)列時(shí),會(huì)將時(shí)間戳最老的消息丟棄掉。如上圖中,如果定義的隊(duì)列長(zhǎng)度為5,當(dāng)前消息隊(duì)列中的消息數(shù)也為5,當(dāng)新消息被發(fā)布到消息隊(duì)列中時(shí),消息1(timestamp = t1)會(huì)被丟棄以便儲(chǔ)存新的消息。
同一節(jié)點(diǎn)的不同發(fā)布者有可能會(huì)定義不同的消息隊(duì)列長(zhǎng)度,所以節(jié)點(diǎn)的全局消息隊(duì)列應(yīng)該是取所有發(fā)布者隊(duì)列長(zhǎng)度之和。
訂閱者的消費(fèi)隊(duì)列
訂閱者的消費(fèi)隊(duì)列與發(fā)布者中的一樣,只是為了儲(chǔ)存消息數(shù)據(jù)。這些消息被創(chuàng)建訂閱者注冊(cè)的回調(diào)函數(shù)所消費(fèi),如
// 收到消息時(shí)調(diào)用的函數(shù),std_msgs::String為消息類型 void chatterCallback(const std_msgs::String::ConstPtr& msg) {ROS_INFO("I heard: [%s]", msg->data.c_str()); } // chatter_sub為訂閱者實(shí)例,“chatter”為話題名稱,1000為話題的消息隊(duì)列長(zhǎng)度,chatterCallback為回調(diào)函數(shù) ros::Subscriber chatter_sub = n.subscribe("chatter", 1000, chatterCallback);ROS在處理回調(diào)函數(shù)時(shí),并不是消息傳來就立刻進(jìn)行處理的,而是在程序調(diào)用spinOnce()和spin()時(shí)統(tǒng)一調(diào)用。
ros::spin()和ros::spinOnce()為消息回調(diào)處理函數(shù)。它倆通常會(huì)出現(xiàn)在ROS的主循環(huán)中,程序需要不斷調(diào)用ros::spin() 或 ros::spinOnce(),兩者區(qū)別在于前者調(diào)用后不會(huì)再返回,也就是你的主程序到這兒就不往下執(zhí)行了,而后者在調(diào)用后還可以繼續(xù)執(zhí)行之后的程序。
默認(rèn)情況下,節(jié)點(diǎn)中的主程序(e.g. while循環(huán)中寫在ros::spinOnce()之前的執(zhí)行程序)和所有回調(diào)函數(shù)都是串行的,當(dāng)節(jié)點(diǎn)無法及時(shí)處理回調(diào)函數(shù)時(shí),這些未被處理的回調(diào)函數(shù)就會(huì)按照先后的順序放入一個(gè)隊(duì)列,該回調(diào)隊(duì)列的長(zhǎng)度就是定義訂閱器時(shí)的消息隊(duì)列長(zhǎng)度了。節(jié)點(diǎn)會(huì)先處理時(shí)間輟最小的回調(diào)函數(shù),然后依次處理隊(duì)列中所有的回調(diào)函數(shù)。但是,當(dāng)發(fā)布器的頻率過快時(shí),會(huì)出現(xiàn)未被處理的回調(diào)函數(shù)數(shù)量超過隊(duì)列長(zhǎng)度,它會(huì)自動(dòng)丟棄時(shí)間輟最長(zhǎng)(最老的)的回調(diào)函數(shù)。下面的案例介紹了一個(gè)節(jié)點(diǎn)兩個(gè)訂閱者的例子:
上述的代碼輸出如下
在一個(gè)調(diào)用周期內(nèi),callback1和callback2方程為串行(阻塞)運(yùn)行,callback1會(huì)被調(diào)用三次,callback2被調(diào)用一次 (由定義的消息隊(duì)列長(zhǎng)度決定)。兩次調(diào)用callback2之間阻塞調(diào)用了四次callback1(次數(shù)取決與所有回調(diào)函數(shù)被放入隊(duì)列中的順序),導(dǎo)致其接收的數(shù)據(jù)差為40。
如果節(jié)點(diǎn)的不同回調(diào)方程(包括主函數(shù)中調(diào)用的方程)同時(shí)花費(fèi)大量的計(jì)算時(shí)間,在單線程下很有可能造成某個(gè)回調(diào)方程被調(diào)用時(shí)所用到的數(shù)據(jù)失效。
這類問題在硬件計(jì)算平臺(tái)允許的情況下,可以引入多線程異步(非阻塞)處理回調(diào)方程來解決。
最新數(shù)據(jù)處理
在了解了發(fā)布者與訂閱者的消息隊(duì)列機(jī)制后,可以同時(shí)更改雙方消息隊(duì)列的長(zhǎng)度為1來達(dá)到回調(diào)函數(shù)處理最新的消息數(shù)據(jù)。這樣保證發(fā)布節(jié)點(diǎn)的消息只有一個(gè),訂閱節(jié)點(diǎn)的消息也只有一個(gè),所以每當(dāng)回調(diào)函數(shù)被調(diào)用時(shí)只能獲取到最新的數(shù)據(jù)。
這適用于一些對(duì)數(shù)據(jù)實(shí)時(shí)性要求比較高(不考慮通訊和函數(shù)調(diào)用延遲)的功能模塊。
小結(jié)
- 節(jié)點(diǎn)會(huì)擁有全局的發(fā)布者消息和訂閱者消息隊(duì)列。
- 發(fā)布者消息隊(duì)列用來緩存節(jié)點(diǎn)內(nèi)所有發(fā)布的消息,并通過之前在節(jié)點(diǎn)管理器中注冊(cè)的信息發(fā)布到對(duì)應(yīng)話題。
- 訂閱者消息隊(duì)列用來緩存節(jié)點(diǎn)內(nèi)所有訂閱的消息,并在消息處理函數(shù)(e.g. ros::spinOnce())被調(diào)用時(shí)串行調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)。
- 話題模式通過消息隊(duì)列緩存消息的形式解耦了發(fā)布者與訂閱者,彼此之間不需要知道對(duì)方的設(shè)置或者執(zhí)行,發(fā)布者只需要發(fā)布消息到發(fā)布隊(duì)列,而訂閱者只需要通過回調(diào)函數(shù)消費(fèi)訂閱隊(duì)列中的消息。
多線程應(yīng)用
上述回調(diào)函數(shù)串行調(diào)用可以通過引入多線程的方式,并發(fā)處理節(jié)點(diǎn)內(nèi)不同的函數(shù)。
在了解具體的做法之前,下面章節(jié)先簡(jiǎn)單介紹一下進(jìn)程和線程的概念。
進(jìn)程和線程
進(jìn)程是對(duì)操作系統(tǒng)上正在運(yùn)行程序的一個(gè)抽象,線程是進(jìn)程中的指令執(zhí)行流的最小單位,是CPU調(diào)度的基本單位。簡(jiǎn)而言之,一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程。
進(jìn)程有獨(dú)立的地址空間,線程有自己的堆棧和局部變量,但線程之間沒有單獨(dú)的地址空間,一個(gè)線程死掉就等于整個(gè)進(jìn)程死掉,所以多進(jìn)程的程序要比多線程的程序健壯,但在進(jìn)程切換時(shí),耗費(fèi)資源較大,效率要差一些。相對(duì)進(jìn)程而言,線程是一個(gè)更加接近于執(zhí)行體的概念。它可以與同進(jìn)程中的其他線程共享數(shù)據(jù),但擁有自己的棧空間,擁有獨(dú)立的執(zhí)行序列,對(duì)于一些要求同時(shí)進(jìn)行并且又要共享某些變量的并發(fā)操作,只能用線程,不能用進(jìn)程。
在ROS中可以理解為一個(gè)可運(yùn)行的節(jié)點(diǎn)(定義了main()方程)即為一個(gè)進(jìn)程,而默認(rèn)的執(zhí)行流程為單線程,即只有一個(gè)執(zhí)行體串行執(zhí)行所有定義的任務(wù)(函數(shù))。
ROS中的多線程
ROS提供的用于處理callback的線程機(jī)制。接口包括自旋,CallbackQueue隊(duì)列處理,time callback等。因接口提供的簡(jiǎn)單易用,下面只做簡(jiǎn)單介紹:
ros::MultiThreadedSpinner是阻塞式的spinner(程序執(zhí)行到這里不會(huì)在往下執(zhí)行,無法使用在while循環(huán)中), 類似于ros::spin(), 你可以在它的構(gòu)造函數(shù)中指定線程數(shù)量, 但如果不指定或者設(shè)為0, 它會(huì)根據(jù)你的CPU內(nèi)核數(shù)創(chuàng)建線程。
ros::AsyncSpinner不是阻塞式的,類似ros::spinOnce(),擁有start()和stop()兩個(gè)函數(shù), start()等待在那個(gè)點(diǎn)上的所有回調(diào),stop()停止回調(diào),并且在銷毀時(shí)自動(dòng)停止.
ros::AsyncSpinner spinner(4); // Use 4 threads spinner.start(); while (ros::ok()) {// Do something } ros::waitForShutdown();ROS默認(rèn)只有一個(gè)全局的回調(diào)隊(duì)列,每當(dāng)調(diào)用ros::spinOnce()時(shí)統(tǒng)一調(diào)用在列的回調(diào)函數(shù)。當(dāng)一個(gè)節(jié)點(diǎn)的訂閱者比較多,要處理的回調(diào)函數(shù)不在一個(gè)頻率上調(diào)用時(shí),可以自定義消息隊(duì)列來獨(dú)自調(diào)用。ros::CallbackQueue可以設(shè)置自定義隊(duì)列,這個(gè)可使用所有訂閱、服務(wù)、定時(shí)器等。自定義的隊(duì)列不是roscpp的默認(rèn)隊(duì)列,意味著ros::spin()和ros::spinOnce()不會(huì)處理這些回調(diào),你需要單獨(dú)處理這些回調(diào)。
The CallbackQueue class has two ways of invoking the callbacks inside it: callAvailable() and callOne(). callAvailable() will take everything currently in the queue and invoke all of them. callOne() will simply invoke the oldest callback on the queue.`
ros::CallbackQueue callback_queue; ros::NodeHandle nh; ros::SubscribeOptions ops=ros::SubscribeOptions::create<std_msgs::String>("chatter",1, state_callback, ros::VoidPtr(),&callback_queue); //指定一個(gè)自定義隊(duì)列 ros::Subscriber listen_state= nh.subscribe(ops); // 必須添加,ros::SubscribeOptions自定義的隊(duì)列才能回調(diào) ros::AsyncSpinner state_spinner(1,&callback_queue);或者
ros::CallbackQueue callback_queue; void callbackThread() { ros::NodeHandle n; while (n.ok()) { callback_queue.callAvailable();} } ros::NodeHandle n; ros::NodeHandle nh; nh.setCallbackQueue(&callback_queue); ros::Subscriber sub = n.subscribe("chatter", 3, callback1); ros::Subscriber sub2 = nh.subscribe("chatter", 1, callback2); // 開一個(gè)線程單獨(dú)調(diào)用callback_queue中的回調(diào)函數(shù) std::thread t1(callbackThread); while(ros::ok()) {ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();ros::spinOnce(); } t1.join();將多線程AsyncSpinner引入上述案例中,
// "chatter"話題的發(fā)布頻率為10Hz void callback1(const std_msgs::String::ConstPtr& msg) {// 延遲一秒ros::Duration(1.0).sleep();ROS_INFO("callback1 : I heard: [%s]", msg->data.c_str()); }void callback2(const std_msgs::String::ConstPtr& msg) {ROS_INFO("callback2 : I heard: [%s]", msg->data.c_str()); }int main(int argc, char **argv) {// 訂閱者sub1的消息隊(duì)列長(zhǎng)度為3 (一個(gè)調(diào)用周期內(nèi)只會(huì)處理三個(gè)消息)ros::Subscriber sub1 = n.subscribe("chatter", 3, callback1);// 訂閱者sub2的消息隊(duì)列長(zhǎng)度為1 (一個(gè)調(diào)用周期內(nèi)只會(huì)處理一個(gè)消息)ros::Subscriber sub2 = n.subscribe("chatter", 1, callback2);// 異步模式ros::AsyncSpinner spinner(2);while(ros::ok()){ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();spinner.start();}return 0; }輸出如下:
可以看到每個(gè)循環(huán)中(間隔一秒),callback1被調(diào)用了一次,callback2被調(diào)用了十次。這里while循環(huán)中的ros::Duration(1.0).sleep(),callback1和callback2三個(gè)執(zhí)行任務(wù)會(huì)并發(fā)執(zhí)行。
線程安全
- 線程安全是多線程編程時(shí)的計(jì)算機(jī)程序代碼中的一個(gè)概念。在擁有共享數(shù)據(jù)的多條線程并行執(zhí)行的程序中,線程安全的代碼會(huì)通過同步機(jī)制保證各個(gè)線程都可以正常且正確的執(zhí)行,不會(huì)出現(xiàn)數(shù)據(jù)污染等意外情況。
- 多個(gè)線程有對(duì)同一個(gè)全局變量進(jìn)行寫的操作時(shí),會(huì)出現(xiàn)線程安全問題。
- 每個(gè)進(jìn)程中訪問臨界資源(比如全局變量等公用資源)的那段程序(代碼)稱為臨界區(qū)(臨界資源是一次僅允許一個(gè)進(jìn)程使用的共享資源,如全局變量等),也稱為臨界段。
- ROS本身的發(fā)布者與訂閱者是線程安全的,所以在引入多線程后,不需要做額外的保護(hù)措施。如在上例中的兩個(gè)回調(diào)函數(shù)中,可以用同一個(gè)發(fā)布者發(fā)布消息。
但是如果是操作用戶自定義的一些臨界資源,需要加鎖保護(hù)。如
// 臨界資源 int global_counter = 0; // 互斥鎖保護(hù)臨界資源 std::mutex mtx; // "chatter"話題的發(fā)布頻率為10Hz void callback1(const std_msgs::String::ConstPtr& msg) {// 延遲一秒ros::Duration(1.0).sleep();ROS_INFO("callback1 : I heard: [%s]", msg->data.c_str());// 需要加鎖保護(hù)global_countermtx.lock();global_counter++;mtx.unlock(); }void callback2(const std_msgs::String::ConstPtr& msg) {global_counter++ROS_INFO("callback2 : I heard: [%s]", msg->data.c_str());// 需要加鎖保護(hù)global_countermtx.lock();global_counter++;mtx.unlock(); }int main(int argc, char **argv) {// 訂閱者sub1的消息隊(duì)列長(zhǎng)度為3 (一個(gè)調(diào)用周期內(nèi)只會(huì)處理三個(gè)消息)ros::Subscriber sub1 = n.subscribe("chatter", 3, callback1);// 訂閱者sub2的消息隊(duì)列長(zhǎng)度為1 (一個(gè)調(diào)用周期內(nèi)只會(huì)處理一個(gè)消息)ros::Subscriber sub2 = n.subscribe("chatter", 1, callback2);// 異步模式ros::AsyncSpinner spinner(2);while(ros::ok()){ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();spinner.start();}return 0; }上述的global_counter由于會(huì)被并發(fā)的兩個(gè)線程訪問,需要引入mtx(std::mutex)來保證線程安全。
線程的數(shù)量
雖然一個(gè)進(jìn)程內(nèi)使用多個(gè)線程理論上會(huì)提升程序性能,但是由于受到物理計(jì)算平臺(tái)(CPU)的限制,過多的線程會(huì)讓系統(tǒng)更加頻繁的切換所執(zhí)行的線程,帶來大量的切換開銷。
常規(guī)的合理數(shù)量為:
CPU密集型(計(jì)算需求大)= CPU的核數(shù)*個(gè)數(shù)
IO密集型(文件讀寫操作) = 可以設(shè)置的大一些
相關(guān)資料
- Callbacks and spinning - http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning
- ROS nodelet - http://wiki.ros.org/nodelet
- 通信機(jī)制 -https://blog.csdn.net/zhi_neng_zhi_fu/article/details/100139851
- 回調(diào)函數(shù)處理與回調(diào)隊(duì)列 - https://blog.csdn.net/Azahaxia/article/details/113934774
- 多線程應(yīng)用 - https://blog.csdn.net/Qm13416479599/article/details/90265676
- ROS中的多線程 - https://blog.csdn.net/tobebest_lah/article/details/103050076
- 多線程數(shù)量 - https://blog.csdn.net/eternal_yangyun/article/details/103236125
總結(jié)
以上是生活随笔為你收集整理的ROS话题模式及多线程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 企业微信报错https:// open.
- 下一篇: 量变与质变——“学而不思则罔,思而不学则