ROS话题模式及多线程
文章介紹ROS通訊中基于發布/訂閱模型的話題模式,默認情況下的消息發送和接收處理,以及如何引入多線程來提高程序性能。
通信機制
ROS常用的通信機制包含話題發布/訂閱模型的話題模式與服務器/客戶端的服務模式,兩種模式的差異如下表
| 通信模型 | 發布/訂閱 | 客戶端/服務器 |
| 反饋機制 | 無 | 有 |
| 底層協議 | ROSTCP/ROSUDP | ROSTCP/ROSUDP |
| 緩沖區 | 有 | 無 |
| 實時性 | 弱 | 強 |
| 節點關系 | 多對多 | 一對多(一個Server) |
| 使用場景 | 弱邏輯處理,多數據傳輸 | 強邏輯處理,少數據傳輸 |
| 此處會針對話題模式做一些原理介紹與其多線程的應用。下面是ROS中一些基本的中英文技術詞匯的對照: |
| 英文 | message | topic | node | publisher | subscriber | callback |
話題模式
話題模式是有多個發布者與多個訂閱者參與的異步通信模式,他們通過節點管理器(ros master)注冊信息,建立連接和通信。
同一節點的所有發布者發布的消息被存儲在全局的消息隊列中,根據時間戳進行取舍。所有的訂閱者被所在節點調用時(e.g. ros::spin())去消息隊列中取得訂閱話題的消息并執行回調函數。
此處可以理解為生產者消費者模型,發布者負責生產消息存入消息隊列中,訂閱者負責從消息隊列取出消息并消費(回調函數),這里通過消息隊列作為數據的緩存區來解耦發布者與訂閱者。
跟經典的生產消費模型不同的是,同一節點的發布者與訂閱者都有自己專門的消息隊列去存儲消息,主要是因為發布者是基于話題發送,而不是直接向訂閱者發送,所以必須要有一個消息隊列來存放發布的消息,以供訂閱者來獲取。而且這個消息隊列的好處是在網絡差、帶寬小、延時高的時候,保證數據不容易丟失。因為發布者與訂閱者不一定在同一臺主機上,因此消息需要通過網絡來交換。但是網絡的性能時好時壞,如果訂閱者沒有消息隊列,那么每次運行回調函數前都要先通過網絡取回消息,然后才能處理。當網絡很差時,就會讓系統堵塞。而有消息隊列的話,訂閱者就可以一邊處理隊列中的消息,一邊通過網絡緩存新的消息,而不用每次處理消息前都要臨時去讀一個回來。這樣就增加了系統的可靠性。
這里會引入的一個問題,同一話題的消息在發布消息隊列發送到訂閱消息隊列的時候會產生拷貝,造成內存浪費。針對發布節點與訂閱節點在同一臺機器的情況,ROS引入了nodelet來避免這個開銷。
發布者的消息隊列
節點中的全局消息隊列會將所有發布者的消息存儲起來,此處的消息隊列可以理解為一個線程安全,根據時間戳排序的消息存儲器,如:
消息隊列的長度,即一個消息隊列中能存儲多少條消息是由創建發布者的時候定義,如
// chatter_pub為發布者對象實例,std_msgs::String為消息類型,“chatter”為話題名稱,1000為話題的消息隊列長度 ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);此處的1000即為消息隊列中會緩存多少條信息。 當同一話題所有發布者生產的頻率大于對應訂閱者消費的頻率時,發布的新消息存入消息隊列時,會將時間戳最老的消息丟棄掉。如上圖中,如果定義的隊列長度為5,當前消息隊列中的消息數也為5,當新消息被發布到消息隊列中時,消息1(timestamp = t1)會被丟棄以便儲存新的消息。
同一節點的不同發布者有可能會定義不同的消息隊列長度,所以節點的全局消息隊列應該是取所有發布者隊列長度之和。
訂閱者的消費隊列
訂閱者的消費隊列與發布者中的一樣,只是為了儲存消息數據。這些消息被創建訂閱者注冊的回調函數所消費,如
// 收到消息時調用的函數,std_msgs::String為消息類型 void chatterCallback(const std_msgs::String::ConstPtr& msg) {ROS_INFO("I heard: [%s]", msg->data.c_str()); } // chatter_sub為訂閱者實例,“chatter”為話題名稱,1000為話題的消息隊列長度,chatterCallback為回調函數 ros::Subscriber chatter_sub = n.subscribe("chatter", 1000, chatterCallback);ROS在處理回調函數時,并不是消息傳來就立刻進行處理的,而是在程序調用spinOnce()和spin()時統一調用。
ros::spin()和ros::spinOnce()為消息回調處理函數。它倆通常會出現在ROS的主循環中,程序需要不斷調用ros::spin() 或 ros::spinOnce(),兩者區別在于前者調用后不會再返回,也就是你的主程序到這兒就不往下執行了,而后者在調用后還可以繼續執行之后的程序。
默認情況下,節點中的主程序(e.g. while循環中寫在ros::spinOnce()之前的執行程序)和所有回調函數都是串行的,當節點無法及時處理回調函數時,這些未被處理的回調函數就會按照先后的順序放入一個隊列,該回調隊列的長度就是定義訂閱器時的消息隊列長度了。節點會先處理時間輟最小的回調函數,然后依次處理隊列中所有的回調函數。但是,當發布器的頻率過快時,會出現未被處理的回調函數數量超過隊列長度,它會自動丟棄時間輟最長(最老的)的回調函數。下面的案例介紹了一個節點兩個訂閱者的例子:
上述的代碼輸出如下
在一個調用周期內,callback1和callback2方程為串行(阻塞)運行,callback1會被調用三次,callback2被調用一次 (由定義的消息隊列長度決定)。兩次調用callback2之間阻塞調用了四次callback1(次數取決與所有回調函數被放入隊列中的順序),導致其接收的數據差為40。
如果節點的不同回調方程(包括主函數中調用的方程)同時花費大量的計算時間,在單線程下很有可能造成某個回調方程被調用時所用到的數據失效。
這類問題在硬件計算平臺允許的情況下,可以引入多線程異步(非阻塞)處理回調方程來解決。
最新數據處理
在了解了發布者與訂閱者的消息隊列機制后,可以同時更改雙方消息隊列的長度為1來達到回調函數處理最新的消息數據。這樣保證發布節點的消息只有一個,訂閱節點的消息也只有一個,所以每當回調函數被調用時只能獲取到最新的數據。
這適用于一些對數據實時性要求比較高(不考慮通訊和函數調用延遲)的功能模塊。
小結
- 節點會擁有全局的發布者消息和訂閱者消息隊列。
- 發布者消息隊列用來緩存節點內所有發布的消息,并通過之前在節點管理器中注冊的信息發布到對應話題。
- 訂閱者消息隊列用來緩存節點內所有訂閱的消息,并在消息處理函數(e.g. ros::spinOnce())被調用時串行調用對應的回調函數。
- 話題模式通過消息隊列緩存消息的形式解耦了發布者與訂閱者,彼此之間不需要知道對方的設置或者執行,發布者只需要發布消息到發布隊列,而訂閱者只需要通過回調函數消費訂閱隊列中的消息。
多線程應用
上述回調函數串行調用可以通過引入多線程的方式,并發處理節點內不同的函數。
在了解具體的做法之前,下面章節先簡單介紹一下進程和線程的概念。
進程和線程
進程是對操作系統上正在運行程序的一個抽象,線程是進程中的指令執行流的最小單位,是CPU調度的基本單位。簡而言之,一個程序至少有一個進程,一個進程至少有一個線程。
進程有獨立的地址空間,線程有自己的堆棧和局部變量,但線程之間沒有單獨的地址空間,一個線程死掉就等于整個進程死掉,所以多進程的程序要比多線程的程序健壯,但在進程切換時,耗費資源較大,效率要差一些。相對進程而言,線程是一個更加接近于執行體的概念。它可以與同進程中的其他線程共享數據,但擁有自己的棧空間,擁有獨立的執行序列,對于一些要求同時進行并且又要共享某些變量的并發操作,只能用線程,不能用進程。
在ROS中可以理解為一個可運行的節點(定義了main()方程)即為一個進程,而默認的執行流程為單線程,即只有一個執行體串行執行所有定義的任務(函數)。
ROS中的多線程
ROS提供的用于處理callback的線程機制。接口包括自旋,CallbackQueue隊列處理,time callback等。因接口提供的簡單易用,下面只做簡單介紹:
ros::MultiThreadedSpinner是阻塞式的spinner(程序執行到這里不會在往下執行,無法使用在while循環中), 類似于ros::spin(), 你可以在它的構造函數中指定線程數量, 但如果不指定或者設為0, 它會根據你的CPU內核數創建線程。
ros::AsyncSpinner不是阻塞式的,類似ros::spinOnce(),擁有start()和stop()兩個函數, start()等待在那個點上的所有回調,stop()停止回調,并且在銷毀時自動停止.
ros::AsyncSpinner spinner(4); // Use 4 threads spinner.start(); while (ros::ok()) {// Do something } ros::waitForShutdown();ROS默認只有一個全局的回調隊列,每當調用ros::spinOnce()時統一調用在列的回調函數。當一個節點的訂閱者比較多,要處理的回調函數不在一個頻率上調用時,可以自定義消息隊列來獨自調用。ros::CallbackQueue可以設置自定義隊列,這個可使用所有訂閱、服務、定時器等。自定義的隊列不是roscpp的默認隊列,意味著ros::spin()和ros::spinOnce()不會處理這些回調,你需要單獨處理這些回調。
The CallbackQueue class has two ways of invoking the callbacks inside it: callAvailable() and callOne(). callAvailable() will take everything currently in the queue and invoke all of them. callOne() will simply invoke the oldest callback on the queue.`
ros::CallbackQueue callback_queue; ros::NodeHandle nh; ros::SubscribeOptions ops=ros::SubscribeOptions::create<std_msgs::String>("chatter",1, state_callback, ros::VoidPtr(),&callback_queue); //指定一個自定義隊列 ros::Subscriber listen_state= nh.subscribe(ops); // 必須添加,ros::SubscribeOptions自定義的隊列才能回調 ros::AsyncSpinner state_spinner(1,&callback_queue);或者
ros::CallbackQueue callback_queue; void callbackThread() { ros::NodeHandle n; while (n.ok()) { callback_queue.callAvailable();} } ros::NodeHandle n; ros::NodeHandle nh; nh.setCallbackQueue(&callback_queue); ros::Subscriber sub = n.subscribe("chatter", 3, callback1); ros::Subscriber sub2 = nh.subscribe("chatter", 1, callback2); // 開一個線程單獨調用callback_queue中的回調函數 std::thread t1(callbackThread); while(ros::ok()) {ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();ros::spinOnce(); } t1.join();將多線程AsyncSpinner引入上述案例中,
// "chatter"話題的發布頻率為10Hz void callback1(const std_msgs::String::ConstPtr& msg) {// 延遲一秒ros::Duration(1.0).sleep();ROS_INFO("callback1 : I heard: [%s]", msg->data.c_str()); }void callback2(const std_msgs::String::ConstPtr& msg) {ROS_INFO("callback2 : I heard: [%s]", msg->data.c_str()); }int main(int argc, char **argv) {// 訂閱者sub1的消息隊列長度為3 (一個調用周期內只會處理三個消息)ros::Subscriber sub1 = n.subscribe("chatter", 3, callback1);// 訂閱者sub2的消息隊列長度為1 (一個調用周期內只會處理一個消息)ros::Subscriber sub2 = n.subscribe("chatter", 1, callback2);// 異步模式ros::AsyncSpinner spinner(2);while(ros::ok()){ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();spinner.start();}return 0; }輸出如下:
可以看到每個循環中(間隔一秒),callback1被調用了一次,callback2被調用了十次。這里while循環中的ros::Duration(1.0).sleep(),callback1和callback2三個執行任務會并發執行。
線程安全
- 線程安全是多線程編程時的計算機程序代碼中的一個概念。在擁有共享數據的多條線程并行執行的程序中,線程安全的代碼會通過同步機制保證各個線程都可以正常且正確的執行,不會出現數據污染等意外情況。
- 多個線程有對同一個全局變量進行寫的操作時,會出現線程安全問題。
- 每個進程中訪問臨界資源(比如全局變量等公用資源)的那段程序(代碼)稱為臨界區(臨界資源是一次僅允許一個進程使用的共享資源,如全局變量等),也稱為臨界段。
- ROS本身的發布者與訂閱者是線程安全的,所以在引入多線程后,不需要做額外的保護措施。如在上例中的兩個回調函數中,可以用同一個發布者發布消息。
但是如果是操作用戶自定義的一些臨界資源,需要加鎖保護。如
// 臨界資源 int global_counter = 0; // 互斥鎖保護臨界資源 std::mutex mtx; // "chatter"話題的發布頻率為10Hz void callback1(const std_msgs::String::ConstPtr& msg) {// 延遲一秒ros::Duration(1.0).sleep();ROS_INFO("callback1 : I heard: [%s]", msg->data.c_str());// 需要加鎖保護global_countermtx.lock();global_counter++;mtx.unlock(); }void callback2(const std_msgs::String::ConstPtr& msg) {global_counter++ROS_INFO("callback2 : I heard: [%s]", msg->data.c_str());// 需要加鎖保護global_countermtx.lock();global_counter++;mtx.unlock(); }int main(int argc, char **argv) {// 訂閱者sub1的消息隊列長度為3 (一個調用周期內只會處理三個消息)ros::Subscriber sub1 = n.subscribe("chatter", 3, callback1);// 訂閱者sub2的消息隊列長度為1 (一個調用周期內只會處理一個消息)ros::Subscriber sub2 = n.subscribe("chatter", 1, callback2);// 異步模式ros::AsyncSpinner spinner(2);while(ros::ok()){ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();spinner.start();}return 0; }上述的global_counter由于會被并發的兩個線程訪問,需要引入mtx(std::mutex)來保證線程安全。
線程的數量
雖然一個進程內使用多個線程理論上會提升程序性能,但是由于受到物理計算平臺(CPU)的限制,過多的線程會讓系統更加頻繁的切換所執行的線程,帶來大量的切換開銷。
常規的合理數量為:
CPU密集型(計算需求大)= CPU的核數*個數
IO密集型(文件讀寫操作) = 可以設置的大一些
相關資料
- Callbacks and spinning - http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning
- ROS nodelet - http://wiki.ros.org/nodelet
- 通信機制 -https://blog.csdn.net/zhi_neng_zhi_fu/article/details/100139851
- 回調函數處理與回調隊列 - https://blog.csdn.net/Azahaxia/article/details/113934774
- 多線程應用 - https://blog.csdn.net/Qm13416479599/article/details/90265676
- ROS中的多線程 - https://blog.csdn.net/tobebest_lah/article/details/103050076
- 多線程數量 - https://blog.csdn.net/eternal_yangyun/article/details/103236125
總結
以上是生活随笔為你收集整理的ROS话题模式及多线程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 企业微信报错https:// open.
- 下一篇: 量变与质变——“学而不思则罔,思而不学则