使用Boost_MPI进行并行编程
關于Boost的介紹:http://www.boost.org/doc/libs/1_53_0/doc/html/mpi.html
原文鏈接:使用 Boost 的 IPC 和 MPI 庫進行并發編程
Arpan Sen, 獨立作家簡介:?Boost C++ 庫讓并發編程變得既簡單又有趣。學習如何使用兩個 Boost 庫 —— Interprocess (IPC) 庫和 Message Passing Interface (MPI) 實現共享內存對象、同步文件鎖和分布式通信等功能。
使用非常流行的 Boost 庫進行并發編程非常有意思。Boost 有幾個用于并發編程領域的庫:Interprocess (IPC) 庫用于實現共享內存、內存映射的 I/O 和消息隊列;Thread 庫用于實現可移植的多線程;Message Passing Interface (MPI) 庫用于分布式計算中的消息傳遞;Asio 庫用于使用套接字和其他低層功能實現可移植的連網功能。本文介紹 IPC 和 MPI 庫以及它們提供的一些功能。
本文中將學習如何使用 Boost IPC 庫實現共享內存對象、消息隊列和同步文件鎖。通過使用 Boost MPI 庫,了解 environment 和communicator 類,以及如何實現分布式通信。
注意:本文中的代碼已經用 gcc-4.3.4 和 boost-1.45 包測試過了。
常用縮寫詞
- API:應用程序編程接口
- I/O:輸入/輸出
- POSIX:針對 UNIX 的便攜式操作系統接口?
- SDK:軟件開發工具包
使用 Boost IPC 庫
Boost Interprocess 是一個只由頭文件組成的庫,所以您需要做的只是在自己的源代碼中包含適當的頭文件并讓編譯器知道 include 路徑。這是一個非常好的特性;您只需下載 Boost 源代碼(見參考資料 中的鏈接),然后就可以開始使用了。例如,要想在自己的代碼中使用共享內存,就使用清單 1 所示的 include。
清單 1. Boost IPC 庫只由頭文件組成
| ? |
在把信息傳遞給編譯器時,您要求進程根據安裝相應地修改 include 路徑。然后,編譯代碼:
| bash-4.1$ g++ ipc1.cpp –I../boost_1_45_0 |
創建共享內存對象
我們先從傳統的 "Hello World!" 程序開始。有兩個進程:第一個進程把字符串 "Hello World!" 寫入內存,另一個進程讀取并顯示此字符串。像清單 2 這樣創建共享內存對象。
#include <boost/interprocess/shared_memory_object.hpp>int main(int argc, char* argv[ ]) {using namespace using boost::interprocess; try { // creating our first shared memory object.shared_memory_object sharedmem1 (create_only, "Hello", read_write);// setting the size of the shared memorysharedmem1.truncate (256);// … more code follows} catch (interprocess_exception& e) { // .. . clean up } }sharedmem1 對象的類型是 shared_memory_object(在 Boost 頭文件中聲明并定義),它的構造函數有三個參數:
- 第一個參數 — create_only — 表示要創建這個共享內存對象而且還沒有創建它。如果已經存在同名的共享對象,就會拋出異常。對于希望訪問已經創建的共享內存的進程,第一個參數應該是open_only。
- 第二個參數 — Hello — 是共享內存區域的名稱。另一個進程將使用這個名稱訪問這個共享內存。
- 第三個參數 — read_write — 是共享內存對象的訪問指示符。因為這個進程要修改共享內存對象的內容,所以使用 read_write。只從共享內存讀取數據的進程使用 read_only 指示符。
truncate 方法以字節為單位設置共享內存的大小。最好把代碼放在 try-catch 塊中。例如,如果無法創建共享內存對象,就拋出類型為boost::interprocess_exception 的異常。
使用共享內存對象寫數據
使用共享內存對象的進程必須在自己的地址空間中映射對象。使用在頭文件 mapped_region.hpp 中聲明并定義的 mapped_region 類執行映射。使用mapped_region 的另一個好處是可以對共享內存對象進行完全和部分訪問。清單 3 演示如何使用mapped_region。
就這么簡單。現在已經創建了您自己的 mapped_region 對象并使用 get_address 方法訪問了它。執行了static_cast,因為get_address 返回一個void*。
當主進程退出時共享內存會怎么樣?
當主進程退出時,并不刪除共享內存。要想刪除共享內存,需要調用 shared_memory_object::remove。第二個進程的訪問機制也很簡單:清單 4 證明了這一點。
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <cstring> #include <cstdlib> #include <iostream>int main(int argc, char *argv[ ]) {using namespace boost::interprocess; try { // opening an existing shared memory object shared_memory_object sharedmem2 (open_only, "Hello", read_only);// map shared memory object in current address spacemapped_region mmap (sharedmem2, read_only);// need to type-cast since get_address returns void* char *str1 = static_cast<char*> (mmap.get_address());std::cout << str1 << std::endl;} catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} return 0; }在清單 4 中,使用 open_only 和 read_only 屬性創建共享內存對象。如果無法找到這個共享內存對象,就會拋出異?!,F在,構建并運行清單 3 和清單 4 中的代碼。應該會在終端上看到 "Hello World!"。
接下來,在第二個進程的代碼(清單 4)中std::cout 后面添加以下代碼并重新構建代碼:
| // std::cout code here shared_memory_object::remove("Hello"); // } catch(interprocess_exception& e) { |
連續執行代碼兩次,第二次執行會顯示 "No such file or directory",這證明共享內存已經被刪除了。
使用消息隊列實現進程間通信
現在,研究另一種流行的進程間通信機制:消息隊列。每個參與通信的進程都可以在隊列中添加消息和從隊列讀取消息。消息隊列具有以下性質:
- 它有名稱,進程使用名稱訪問它。
- 在創建隊列時,用戶必須指定隊列的最大長度和一個消息的最大大小。
- 隊列是持久的,這意味著當創建它的進程死亡之后它仍然留在內存中??梢酝ㄟ^顯式地調用 boost::interprocess::message_queue::remove 刪除隊列。
在 清單 5 所示的代碼片段中,進程創建了一個可包含 20 個整數的消息隊列。
#include <boost/interprocess/ipc/message_queue.hpp> #include <iostream> int main(int argc, char* argv[ ]) {using namespace boost::interprocess;try { // creating a message queuemessage_queue mq (create_only, // only create"mq", // name20, //max message countsizeof(int) //max message size);// … more code follows} catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} }
注意傳遞給 message_queue 的構造函數的 create_only 屬性。與共享內存對象相似,對于以只讀方式打開消息隊列,應該把open_only 屬性傳遞給構造函數。
發送和接收數據
在發送方,使用隊列的 send 方法添加數據。send 方法有三個輸入參數:原始數據的指針 (void*)、數據的大小和優先級。目前,以相同的優先級發送所有數據。清單 6 給出代碼。
#include <boost/interprocess/ipc/message_queue.hpp> #include <iostream> int main(int argc, char* argv[ ]) {using namespace boost::interprocess;try { // creating a message queuemessage_queue mq (create_only, // only create"mq", // name20, //max message countsizeof(int) //max message size);// now send the messages to the queuefor (int i=0; i<20; ++i) mq.send(&i, sizeof(int), 0); // the 3rd argument is the priority } catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} }在接收方,使用 open_only 屬性創建隊列。通過調用 message_queue 類的 receive 方法從隊列獲取消息。清單 7 給出receive 的方法簽名。
我們來仔細看一下。第一個參數是從隊列接收的數據將被存儲到的位置。第二個參數是接收的數據的預期大小。第三個參數是接收的數據的實際大小。第四個參數是接收的消息的優先級。顯然,如果在執行程序期間第二個和第三個參數不相等,就是出現錯誤了。清單 8 給出接收者進程的代碼。
#include <boost/interprocess/ipc/message_queue.hpp> #include <iostream> int main(int argc, char* argv[ ]) {using namespace boost::interprocess;try { // opening the message queue whose name is mqmessage_queue mq (open_only, // only open"mq" // name);size_t recvd_size; unsigned int priority; // now send the messages to the queuefor (int i=0; i<20; ++i) { int buffer; mq.receive ((void*) &buffer, sizeof(int), recvd_size, priority); if (recvd_size != sizeof(int)) ; // do the error handlingstd::cout << buffer << " " << recvd_size << " " << priority;} } catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} }這相當簡單。注意,仍然沒有從內存中刪除消息隊列;與共享內存對象一樣,這個隊列是持久的。要想刪除隊列,應該在使用完隊列之后添加以下行:
| message_queue::remove("mq"); // remove the queue using its name |
消息優先級
在發送方,做 清單 9 所示的修改。接收方代碼不需要修改。
message_queue::remove("mq"); // remove the old queuemessage_queue mq (…); // create as beforefor (int i=0; i<20; ++i) mq.send(&i, sizeof(int), i%2); // 第 3 個參數為消息的優先級// … rest as usual
再次運行代碼時,應該會看到 清單 10 所示的輸出。
1 4 1 3 4 1 5 4 1 7 4 1 9 4 1 11 4 1 13 4 1 15 4 1 17 4 1 19 4 1 0 4 0 2 4 0 4 4 0 6 4 0 8 4 0 10 4 0 12 4 0 14 4 0 16 4 0 18 4 0清單 10 證實,第二個進程優先接收優先級高的消息。
同步對文件的訪問
共享內存和消息隊列很不錯,但是文件 I/O 也是重要的進程間通信工具。對并發進程用于通信的文件訪問進行同步并非易事,但是 Boost IPC 庫提供的文件鎖功能讓同步變得簡單了。在進一步解釋之前,來看一下清單 11,了解file_lock 對象是如何工作的。
#include <fstream> #include <iostream> #include <boost/interprocess/sync/file_lock.hpp> #include <cstdlib>int main() { using namespace boost::interprocess; std::string fileName("test"); std::fstream file;file.open(fileName.c_str(), std::ios::out | std::ios::binary | std::ios::trunc); if (!file.is_open() || file.bad()) { std::cout << "Open failed" << std::endl; exit(-1); }try { file_lock f_lock(fileName.c_str());f_lock.lock();std::cout << "Locked in Process 1" << std::endl;file.write("Process 1", 9);file.flush(); f_lock.unlock();std::cout << "Unlocked from Process 1" << std::endl;} catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;}file.close();return 0;
代碼首先打開一個文件,然后使用 file_lock 鎖定它。寫操作完成之后,它刷新文件緩沖區并解除文件鎖。使用 lock 方法獲得對文件的獨占訪問。如果另一個進程也試圖對此文件進行寫操作并已經請求了鎖,那么它會等待,直到第一個進程使用unlock 自愿地放棄鎖。file_lock 類的構造函數接受要鎖定的文件的名稱,一定要在調用lock 之前打開文件;否則會拋出異常。
現在,復制 清單 11 中的代碼并做一些修改。具體地說,讓第二個進程請求這個鎖。清單 12 給出相關修改。
// .. as in Listing 11file_lock f_lock(fileName.c_str());f_lock.lock();std::cout << "Locked in Process 2" << std::endl;system("sleep 4"); file.write("Process 2", 9);file.flush(); f_lock.unlock();std::cout << "Unlocked from Process 2" << std::endl;// file.close();現在,如果這兩個進程同時運行,有 50% 的機會看到第一個進程等待 4 秒后才獲得 file_lock,其他情況都不變。
在使用 file_lock 時,必須記住幾點。這里討論的主題是進程間通信,重點在進程 上。這意味著,不是使用 file_lock 來同步同一進程中各個線程的數據訪問。在與 POSIX 兼容的系統上,文件句柄是進程屬性,而不是 線程屬性。下面是使用文件鎖的幾條規則:
- 對于每個進程,每個文件使用一個 file_lock 對象。
- 使用相同的線程來鎖定和解鎖文件。
- 在解鎖文件之前,通過調用 C 的 flush 庫例程或 flush 方法(如果喜歡使用C++ fstream 的話),刷新寫入者進程中的數據。
結合使用 file_lock 和有范圍(scope)的鎖
在執行程序時,可能會出現拋出異常而文件沒有解鎖的情況。這種情況可能會導致意外的程序行為。為了避免這種情況,可以考慮把 file_lock 對象放在(boost/interprocess/sync/scoped_lock.hpp 中定義的)scoped_lock 中。如果使用scoped_lock,就不需要顯式地鎖定或解鎖文件;鎖定發生在構造器內,每當您退出該范圍,就會自動發生解鎖。清單 13 給出對清單 11 的修改,使之使用有范圍的鎖。
清單 13. 結合使用 scoped_lock 和 file_lock
| #include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/interprocess/sync/file_lock.hpp>//… code as in Listing 11 file_lock f_lock(fileName.c_str()); scoped_lock<file_lock> s_lock(f_lock); // internally calls f_lock.lock( ); // No need to call explicit lock anymore std::cout << "Locked in Process 1" << std::endl; file.write("Process 1", 9); // … code as in Listing 11 |
注意:關于 Resource Acquisition Is Initialization (RAII) 編程習慣法的更多信息,參見參考資料 中的鏈接。
回頁首
了解 Boost MPI
如果您不熟悉 Message Passing Interface,那么在討論 Boost MPI 之前,應該先瀏覽 參考資料 中提供的 MPI 參考資料。MPI 是一個容易使用的標準,它采用通過傳遞消息實現進程間通信的模型。不需要使用套接字或其他通信原語;MPI 后端管理所有底層處理。那么,使用 Boost MPI 有什么好處?Boost MPI 的創建者提供了更高層的抽象,并在 MPI 提供的 API 之上構建了一套簡單的例程,比如MPI_Init 和MPI_Bcast。
Boost MPI 不是一個單獨的庫,不能在下載和構建之后直接使用。相反,必須安裝任意 MPI 實現(比如 MPICH 或 Open MPI)并構建 Boost Serialization 庫。關于如何構建 Boost MPI 的詳細信息參見參考資料。通常,使用以下命令構建 Boost MPI:
| bash-4.1$ bjam –with-mpi |
Windows? 用戶可以從 BoostPro 下載預先構建的 MPI 庫(見 參考資料)。這些庫與 Microsoft? HPC Pack 2008 和 2008 R2 兼容(見 參考資料),適用于帶 Service Pack 3 的 Windows XP 或更高版本的客戶機操作環境。
回頁首
用 MPI 實現 Hello World 程序
您必須了解 Boost MPI 庫中的兩個主要類:environment 類和 communicator 類。前者負責分布式環境的初始化;后者用于進程之間的通信。因為這里討論的是分布式計算,我們有四個進程,它們都在終端上輸出 "Hello World"。清單 14 給出代碼。
#include <boost/mpi.hpp> #include <iostream>int main(int argc, char* argv[]) {boost::mpi::environment env(argc, argv);boost::mpi::communicator world;std::cout << argc << std::endl;std::cout << argv[0] << std::endl;std::cout << "Hello World! from process " << world.rank() << std::endl;return 0; }現在,構建 清單 14 中的代碼并鏈接 Boost MPI 和 Serialization 庫。在 shell 提示上運行可執行程序。應該會看到 "Hello World! from process 0"。接下來,使用 MPI 分派器工具(例如,對于 Open MPI 用戶,使用mpirun;對于 Microsoft HPC Pack 2008,使用mpiexec)并運行可執行程序:
| mpirun –np 4 <executable name> ORmpiexec –n 4 <executable name> |
現在應該會看到與 清單 15 相似的輸出,其中的 mympi1 是可執行程序名稱。
清單 15. 運行 MPI 代碼的輸出
| 1 mympi1 Hello, World! from process 3 1 mympi1 1 mympi1 Hello, World! from process 1 Hello, World! from process 2 1 mympi1 Hello, World! from process 0 |
在 MPI 框架中,已經創建了相同進程的四個拷貝。在 MPI 環境中,每個進程有惟一的 ID(由 communicator 對象決定)?,F在,試試進程間通信。使用send 和receive 函數調用讓一個進程與另一個進程通信。發送消息的進程稱為主進程,接收消息的進程稱為工作者進程。主進程和接收者進程的源代碼是相同的,使用world 對象提供的等級決定功能(見清單 16)。
#include <boost/mpi.hpp> #include <iostream>int main(int argc, char* argv[]) {boost::mpi::environment env(argc, argv);boost::mpi::communicator world;if (world.rank() == 0) {world.send(1, 9, 32);world.send(2, 9, 33);} else { int data;world.recv(0, 9, data);std::cout << "In process " << world.rank( ) << "with data " << data<< std::endl;} return 0; }先看一下 send 函數。第一個參數是接收者進程的 ID;第二個是消息數據的 ID;第三個是實際數據。為什么需要消息標簽?接收者進程在執行期間的特定點上可能希望處理具有特定標簽的消息,所以這個方案會有幫助。對于進程 1 和 2,recv 函數被阻塞,這意味著程序會等待,直到從進程 0 收到標簽 ID 為 9 的消息。當收到這個消息時,把信息存儲在data 中。下面是運行代碼的輸出:
| In process 1 with data 32 In process 2 with data 33 |
如果在接收方有 world.recv(0, 1, data); 這樣的代碼,會發生什么?代碼阻塞,但實際上是,接收者進程在等待一個永遠不會到達的消息。
結論 束語
本文只討論了這兩個庫提供的功能的很小一部分。這些庫提供的其他功能包括 IPC 的內存映射 I/O 和 MPI 的廣播功能。從易用性的角度來說,IPC 更好。MPI 庫依賴于原生的 MPI 實現,而原生 MPI 庫以及預先構建的 Boost MPI 和 Serialization 庫的現成可用性仍然是個問題。但是,花點兒精力構建 MPI 實現和 Boost 的源代碼是值得的。
總結
以上是生活随笔為你收集整理的使用Boost_MPI进行并行编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于路上的风景的句子104个
- 下一篇: 3D集合图元:最小边界框/包围盒(bou