ACE入门
ACE編譯
1. 設(shè)置環(huán)境變量
2. 在 ACE_wrappers\ace 目錄下創(chuàng)建 config.h 文件,寫入:
#include "ace/config-win32.h"?
3.?如果你希望使用標(biāo)準(zhǔn)的?C++?頭文件(例如?iostream、cstdio?等)在?#include?"ace/config-win32.h"?前加入:?
#define?ACE_HAS_STANDARD_CPP_LIBRARY?1?
4.?如果你希望使用?MFC?庫,那么?config.h?中加入:?
#define?ACE_HAS_MFC?1?
如果你希望使用?MFC?靜態(tài)庫,那么加入:?
#define?ACE_USES_STATIC_MFC?
5.?如果你希望編譯靜態(tài)版本的?ACE?庫,那么在?config.h?中加入:?
#define?ACE_AS_STATIC_LIBS?
6.?如果你希望減少靜態(tài)庫的大小,可以禁止使用?inline,在?config.h?的?#include?"ace/config-win32.h"?前加入:?
#define?ACE_NO_INLINE?
?
ACE結(jié)構(gòu)簡介
1)ACE OS adaptation 層:封裝了 OS API,對上層提供 OS 平臺無關(guān)的接口。
2)C++ wrapper facades 層:位于 OS adaptation 之上,提供了與之相似的功能,這些功能使用 C++ 的類封裝起來,而不是 C 語言 API。每個 wrapper facade 都包含一個或者一個以上的類。我們可以有選擇的繼承、聚合這些 wrapper facade。
3)框架層(Framework layer)
框架層在 C++ wrapper facades 層之上,它集成和擴(kuò)充了 wrapper facade 類。
<1> 事件多路分離和分發(fā)框架
ACE Reactor 和 ACE Proactor 實現(xiàn)了 Reactor 模式和 Proactor 模式。
<2> 連接建立和服務(wù)初始化框架
ACE Acceptor-Connector 框架實現(xiàn)了 Acceptor-Connector 模式。
<3> 并發(fā)框架
ACE 提供了 Task 框架實現(xiàn)了并發(fā)模式。
<4> 服務(wù)配置框架
ACE 的服務(wù)配置框架實現(xiàn)了 Component Configurator 模式。
<5> 流框架
ACE 的流框架實現(xiàn)了 Pipes and Fiters 模式。
4)ACE 網(wǎng)絡(luò)組件層
組件(component)就是軟件系統(tǒng)中被封裝的一個部分,ACE 發(fā)行包中的組件用于提供以下功能:
<1> 演示 ACE
<2> 提供常見網(wǎng)絡(luò)服務(wù)的可復(fù)用實現(xiàn)。如提供日志記錄、時間同步等服務(wù)的可復(fù)用實現(xiàn)。
?
線程的創(chuàng)建與管理
一. 線程入口函數(shù)
所有線程必須從一個指定的函數(shù)開始執(zhí)行,該函數(shù)稱為線程函數(shù),它必須具有下列原型:
void* worker(void *arg) {}
該函數(shù)輸入一個void *型的參數(shù),可以在創(chuàng)建線程時傳入。
注意:所有的線程啟動函數(shù)(方法)必須是靜態(tài)的或全局的(就如同直接使用OS線程API時所要求的一樣)。
二.線程基本操作
1.創(chuàng)建一個線程
一個進(jìn)程的主線程是由操作系統(tǒng)自動生成,如果你要讓一個主線程創(chuàng)建額外的線程,可以通過ACE_Thread::spawn()實現(xiàn),該函數(shù)一般的使用方式如下:
ACE_thread_t threadId; ACE_hthread_t threadHandle; ACE_Thread::spawn( (ACE_THR_FUNC)worker, //線程執(zhí)行函數(shù) NULL, //執(zhí)行函數(shù)參數(shù) THR_JOINABLE | THR_NEW_LWP, &threadId, &threadHandle );ACE_Thread::spawn((ACE_THR_FUNC)worker) 使用其默認(rèn)參數(shù),來創(chuàng)建一個worker的線程。
ACE_Thread::spawn_n函數(shù)來創(chuàng)建多個線程。
2.終止線程
在線程函數(shù)體中ACE_Thread::exit()調(diào)用即可終止線程執(zhí)行。
3.設(shè)定線程的相對優(yōu)先級
當(dāng)一個線程被首次創(chuàng)建時,它的優(yōu)先級等同于它所屬進(jìn)程的優(yōu)先級。一個線程的優(yōu)先級是相對于其所屬的進(jìn)程的優(yōu)先級而言的。可以通過調(diào)用ACE_Thread::setprio函數(shù)改變線程的相對優(yōu)先級,該函數(shù)的調(diào)用方式如下:
ACE_Thread::setprio(threadHandle,ACE_DEFAULT_THREAD_PRIORITY)
4.掛起及恢復(fù)線程
掛起線程可以通過來實現(xiàn),它能暫停一個線程的執(zhí)行,其調(diào)用方式如下ACE_Thread::suspend(threadHandle) 。
相應(yīng)的,可以通過ACE_Thread::resume(threadHandle) 恢復(fù)被掛起的線程的執(zhí)行。
5.等待線程結(jié)束
在主函數(shù)中調(diào)用ACE_Thread::join(threadHandle)可阻塞主函數(shù),直道線程結(jié)束才能繼續(xù)執(zhí)行。
6.停止線程
在主函數(shù)中調(diào)用ACE_Thread::cancel (threadHandle)可停止線程的執(zhí)行(在Unix底下可以,而在windows下好像不起作用,有待檢驗)。
三.程序示例
下面例子演示了如何用ace創(chuàng)建一個線程。
#include "ace/Thread.h" #include "ace/Synch.h"#pragma comment(lib, "ACEd.lib")#include <iostream> using namespace std; void* worker(void *arg) { for(int i=0;i<10;i++){ACE_OS::sleep(1);cout<<endl<<"hello world"<<endl;} return NULL; } int main(int argc, char *argv[]) { ACE_thread_t threadId;ACE_hthread_t threadHandle;ACE_Thread::spawn((ACE_THR_FUNC)worker, //線程執(zhí)行函數(shù)NULL, //執(zhí)行函數(shù)參數(shù)THR_JOINABLE | THR_NEW_LWP,&threadId,&threadHandle);ACE_Thread::join(threadHandle); return 0; }在這個簡單的例子中,創(chuàng)建了1個工作者線程,執(zhí)行程序中定義的worker()函數(shù)。然后阻塞主函數(shù),待線程結(jié)束后退出程序。
ACE Lock類屬
鎖類屬包含的類包裝簡單的鎖定機(jī)制,比如互斥體、信號量、讀/寫互斥體和令牌等。這里我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進(jìn)行一些簡單的說明。
1.互斥體 ACE_Thread_Mutex
互斥體用于保護(hù)共享的易變代碼,也就是全局或靜態(tài)數(shù)據(jù)。這樣的數(shù)據(jù)必須通過互斥體進(jìn)行保護(hù),以防止它們在多個線程同時訪問時損壞。??
#include "ace/Thread.h" #include "ace/Synch.h" #include <iostream> using namespace std; ACE_Thread_Mutex mutex; void* Thread1(void *arg) {mutex.acquire();ACE_OS::sleep(3);cout<<endl<<"hello thread1"<<endl;mutex.release(); return NULL; } void* Thread2(void *arg) {mutex.acquire();cout<<endl<<"hello thread2"<<endl;mutex.release(); return NULL; } int main(int argc, char *argv[]) { ACE_Thread::spawn((ACE_THR_FUNC)Thread1); //Thread2 比Thread1晚創(chuàng)建1秒鐘,故后嘗試獲取互斥體ACE_OS::sleep(1);ACE_Thread::spawn((ACE_THR_FUNC)Thread2); while(true)ACE_OS::sleep(10); return 0; }ACE_Thread_Mutex主要有兩個方法:
當(dāng)線程要訪問共享資源時,首先調(diào)用acquire()方法獲取互斥體,從而獲取對改互斥體所保護(hù)的共享資源的唯一訪問權(quán)限,訪問結(jié)束時調(diào)用釋放互斥體,使得其它線程能獲取共享資源的訪問權(quán)限。
2.ACE Lock類屬簡介。
ACE Lock類屬列表如下:
ACE_Mutex
封裝互斥機(jī)制(根據(jù)平臺,可以是mutex_t、pthread_mutex_t等等)的包裝類,用于提供簡單而有效的機(jī)制來使對共享資源的訪問序列化。它與二元信號量(binary semaphore)的功能相類似。可被用于線程和進(jìn)程間的互斥。
ACE_Thread_Mutex
可用于替換ACE_Mutex,專用于線程同步。
ACE_Process_Mutex
可用于替換ACE_Mutex,專用于進(jìn)程同步。
ACE_NULL_Mutex
提供了ACE_Mutex接口的"無為"(do-nothing)實現(xiàn),可在不需要同步時用作替換。
ACE_RW_Mutex
封裝讀者/作者鎖的包裝類。它們是分別為讀和寫進(jìn)行獲取的鎖,在沒有作者在寫的時候,多個讀者可以同時進(jìn)行讀取。
ACE_RW_Thread_Mutex
可用于替換ACE_RW_Mutex,專用于線程同步。
ACE_RW_Process_Mutex
可用于替換ACE_RW_Mutex,專用于進(jìn)程同步。
ACE_Semaphore
這些類實現(xiàn)計數(shù)信號量,在有固定數(shù)量的線程可以同時訪問一個資源時很有用。在OS不提供這種同步機(jī)制的情況下,可通過互斥體來進(jìn)行模擬。
ACE_Thread_Semaphore
應(yīng)被用于替換ACE_Semaphore,專用于線程同步。
ACE_Process_Semaphore
應(yīng)被用于替換ACE_Semaphore,專用于進(jìn)程同步。
ACE_Token
提供"遞歸互斥體"(recursive mutex),也就是,當(dāng)前持有某令牌的線程可以多次重新獲取它,而不會阻塞。而且,當(dāng)令牌被釋放時,它確保下一個正阻塞并等待此令牌的線程就是下一個被放行的線程。
ACE_Null_Token
令牌接口的"無為"(do-nothing)實現(xiàn),在你知道不會出現(xiàn)多個線程時使用。
ACE_Lock
定義鎖定接口的接口類。一個純虛類,如果使用的話,必須承受虛函數(shù)調(diào)用開銷。
ACE_Lock_Adapter
基于模板的適配器,允許將前面提到的任意一種鎖定機(jī)制適配到ACE_Lock接口。
可以簡單的分為以下幾類:
互斥鎖(通常稱為"互斥體"或"二元信號量")用于保護(hù)多線程控制并發(fā)訪問的共享資源的完整性。互斥體通過定義臨界區(qū)來序列化多線程控制的執(zhí)行,在臨界區(qū)中每一時刻只有一個線程在執(zhí)行它的代碼。互斥體簡單而高效(時間和空間)。
ACE線程庫提供了Mutex式的類(是一組互斥體對象,擁有類似的接口),他是一種簡單而高效的類型是"非遞歸"互斥體。非遞歸互斥體不允許當(dāng)前擁有互斥體的線程在釋放它之前重新獲取它。否則,將會立即發(fā)生死鎖。遞歸互斥體在ACE Recursive_Thread_Mutex類中可移植地實現(xiàn)。
讀者/作者鎖與互斥體相類似。例如,獲取讀者/作者鎖的線程也必須釋放它。多個線程可同時獲取一個讀者/作者鎖用于讀,但只有一個線程可以獲取該鎖用于寫。當(dāng)互斥體保護(hù)的資源用于讀遠(yuǎn)比用于寫要頻繁時,讀者/作者互斥體有助于改善并發(fā)的執(zhí)行。
ACE線程庫提供了一個叫作RW_Mutex的類,在C++封裝類中可移植地實現(xiàn)了讀者/作者鎖的語義。讀者/作者鎖將優(yōu)先選擇權(quán)給作者。因而,如果有多個讀者和一個作者在鎖上等待,作者將會首先獲取它。
計數(shù)信號量
在概念上,計數(shù)信號量是可以原子地增減的整數(shù)。如果線程試圖減少一個值為零的信號量的值,它就會阻塞,直到另一個線程增加該信號量的值。
計數(shù)信號量用于追蹤共享程序狀態(tài)的變化。它們記錄某種特定事件的發(fā)生。因為信號量維護(hù)狀態(tài),它們允許線程根據(jù)該狀態(tài)來作決定,即使事件是發(fā)生在過去。
信號量比互斥體效率要低,但是,它們要更為通用,因為它們無需被最初獲取它們的同一線程獲取和釋放。這使得它們能夠用于異步的執(zhí)行上下文中(比如信號處理器)。ACE線程庫提供一個叫作Semaphore的類來可移植地在C++包裝類中實現(xiàn)信號量語義。
ACE Guard類屬
與C一級的互斥體API相比較,Mutex包裝為同步多線程控制提供了一種優(yōu)雅的接口。但是,Mutex潛在地容易出錯,因為程序員有可能忘記調(diào)用release方法(當(dāng)然,C級的互斥體API更容易出錯)。這可能由于程序員的疏忽或是C++異常的發(fā)生而發(fā)生,然而,其導(dǎo)致及其嚴(yán)重的后果--死鎖。
因此,為改善應(yīng)用的健壯性,ACE同步機(jī)制有效地利用C++類構(gòu)造器和析構(gòu)器的語義來確保Mutex鎖被自動獲取和釋放。
ACE提供了一個稱為Guard、Write_Guard和Read_Guard的類族,確保在進(jìn)入和退出C++代碼塊時分別自動獲取和釋放鎖。
Guard類是最基本的守衛(wèi)機(jī)制,定義可以簡化如下(實際定義比這相對要復(fù)雜而完善一點):
template <class LOCK>
class Guard
{
public:
??? Guard (LOCK &l): lock_ (&l){ lock_.acquire (); }
??? ?Guard (void) {??? lock_.release (); }
private:
??? LOCK lock_;
}
Guard類的對象定義一"塊"代碼,在其上鎖被自動獲取,并在退出塊時自動釋放,即使是程序拋異常也能保證自動解鎖。這種機(jī)制也能為Mutex、RW_Mutex和Semaphore同步封裝工作。
對于讀寫鎖,由于加鎖接口不一樣,ace也提供了相應(yīng)的Read_Guard和Write_Guard類,Read_Guard和Write_Guard類有著與Guard類相同的接口。但是,它們的acquire方法分別對鎖進(jìn)行讀和寫。
缺省地, Guard類構(gòu)造器將會阻塞程序,直到鎖被獲取。會有這樣的情況,程序必須使用非阻塞的acquire調(diào)用(例如,防止死鎖)。因此,可以傳給ACE Guard的構(gòu)造器第二個參數(shù)(請參看原始代碼,而不是我這里的簡化代碼),指示它使用鎖的try_acquire方法,而不是acquire。隨后調(diào)用者可以使用Guard的locked方法來原子地測試實際上鎖是否已被獲取。
用Guard重寫上一節(jié)的Thread1方法如下(注釋了的部分是原有代碼):
void* Thread1(void *arg)
{
??? ACE_Guard<ACE_Thread_Mutex> guard(mutex);
//mutex.acquire();
??? ACE_OS::sleep(3);
??? cout<<endl<<"hello thread1"<<endl;
//mutex.release();
return NULL;
}
相比較而言,使用Guard更加簡潔,并且會自動解鎖,免除了一部分后顧之憂。
注意:
Guard是在Guard變量析構(gòu)時解鎖,如果在同一函數(shù)中兩次對同一互斥體變量使用Guard要注意其對象生命周期,否則容易造成死鎖。
?
ACE Condition類屬
ACE Condition類屬(條件變量)提供風(fēng)格與互斥體、讀者/作者鎖和計數(shù)信號量不同的鎖定機(jī)制。當(dāng)持有鎖的線程在臨界區(qū)執(zhí)行代碼時,這三種機(jī)制讓協(xié)作線程進(jìn)行等待。相反,條件變量通常被一個線程用于使自己等待,直到一個涉及共享數(shù)據(jù)的條件表達(dá)式到達(dá)特定的狀態(tài)。當(dāng)另外的協(xié)作線程指示共享數(shù)據(jù)的狀態(tài)已發(fā)生變化,調(diào)度器就喚醒一個在該條件變量上掛起的線程。于是新喚醒的線程重新對它的條件表達(dá)式進(jìn)行求值,如果共享數(shù)據(jù)已到達(dá)合適狀態(tài),就恢復(fù)處理。
ACE線程庫提供一個叫作Condition的類來可移植地在C++包裝類中實現(xiàn)條件變量語義。定義方式如下:
ACE_Thread_Mutex mutex;
ACE_Condition<ACE_Thread_Mutex> cond(mutex);
該對象有兩個常用方法。
向使用該條件變量的其它線程發(fā)送滿足條件信號。
查詢是否滿足條件,如果滿足,則繼續(xù)往下執(zhí)行;如果不滿足條件,主線程就等待在此條件變量上。條件變量隨即自動釋放互斥體,并使主線程進(jìn)入睡眠。
條件變量總是與互斥體一起使用。這是一種可如下描述的一般模式:
while( expression NOT TRUE ) wait on condition variable;
條件變量不是用于互斥,往往用于線程間的協(xié)作,下面例子演示了通過條件變量實現(xiàn)線程協(xié)作。
#include "ace/Thread.h" #include "ace/Synch.h" #include <iostream> using namespace std; ACE_Thread_Mutex mutex; ACE_Condition<ACE_Thread_Mutex> cond(mutex); void* worker(void *arg) { ACE_OS::sleep(2); //保證eater線程的cond.wait()在worker線程的cond.signal()先執(zhí)行 mutex.acquire(); ACE_OS::sleep(1); cout<<endl<<"produce"<<endl; cond.signal(); mutex.release(); return NULL; } void* eater(void *arg) { mutex.acquire(); cond.wait(); cout<<endl<<"eat"<<endl; mutex.release(); return NULL; } int main(int argc, char *argv[]) { ACE_Thread::spawn((ACE_THR_FUNC)worker); ACE_OS::sleep(1); ACE_Thread::spawn((ACE_THR_FUNC)eater); while(true) ACE_OS::sleep(10); return 0; }這個例子中,首先創(chuàng)建了一個生產(chǎn)者線程worker和一個消費(fèi)者線程eater,消費(fèi)者線程執(zhí)行比生產(chǎn)者快,兩個線程不加限制并發(fā)執(zhí)行會導(dǎo)致先消費(fèi),后生產(chǎn)的情況(只是加互斥鎖也不能很好的解決,以為無法保證生產(chǎn)者一定先獲得互斥體)。所以這里通過條件變量的通知方式保證線程的順序執(zhí)行:
消費(fèi)者線程獲取互斥體,等待條件滿足(生產(chǎn)者生產(chǎn)了食品)。同時釋放互斥體,進(jìn)入休眠狀態(tài)。
生產(chǎn)者獲取互斥體(雖然是消費(fèi)者先獲取的互斥體,但消費(fèi)者調(diào)用的wait函數(shù)會釋放消費(fèi)者的互斥體),生產(chǎn)商品后,通過條件變量發(fā)送信號(調(diào)用signal函數(shù))通知消費(fèi)者生產(chǎn)完成,結(jié)束生產(chǎn)過程,釋放互斥體。
消費(fèi)者收到信號后,重新獲取互斥體,完成消費(fèi)過程。
使用條件變量的注意事項:
條件變量必須和互斥體一起使用,也就是說使用前必須加鎖(調(diào)用互斥體acquire函數(shù)),使用完后需釋放互斥體。
條件變量中的wait()和signal()成對使用的話,必須保證wait()函數(shù)在signal()之前執(zhí)行,這樣才能保證wait()能收到條件滿足通知,不至于一直等待下去,形成死鎖(worker線程中的第一句話就是起的這個作用)。
?
ACE Synchronization類
這一類并發(fā)控制對象一般也叫做雜項并發(fā)類,這類對象一般用得不多,這里我只是對其作一些簡單的介紹。
1.Atomic_Op類
ACE_Atomic_Op類用于將同步透明地參數(shù)化進(jìn)基本的算術(shù)運(yùn)算中。
ACE_Atomic_Op是一種模板類,鎖定機(jī)制和需要參數(shù)化的類型被作為參數(shù)傳入其中,重載所有算術(shù)操作符,并確保在操作前獲取鎖,在操作后釋放它。運(yùn)算本身被委托給通過模板傳入的的類。
使用ACE_Atomic_Op進(jìn)行變量封裝時,對于那些用ACE_Atomic_Op封裝了的變量操作都變成了線程安全的,而并看不到顯式的加解鎖代碼,代碼變得更簡潔,優(yōu)雅。
2.ACE中的柵欄(Barrier)
一組線程可以使用柵欄來進(jìn)行共同的相互同步。組中的每個線程各自執(zhí)行,直到到達(dá)柵欄,就阻塞在那里。在所有相關(guān)線程到達(dá)柵欄后,它們就全部繼續(xù)它們的執(zhí)行。就是說,它們一個接一個地阻塞,等待其他的線程到達(dá)柵欄;一旦所有線程都到達(dá)了它們的執(zhí)行路徑中的"柵欄點",它們就一起重新啟動。
在ACE中,柵欄在ACE_Barrier類中實現(xiàn)。在柵欄對象被實例化時,它將要等待的線程的數(shù)目會作為參數(shù)傳入。一旦到達(dá)執(zhí)行路徑中的"柵欄點",每個線程都在柵欄對象上發(fā)出wait()調(diào)用。它們在這里阻塞,直到其他線程到達(dá)它們各自的"柵欄點",然后再一起繼續(xù)執(zhí)行。當(dāng)柵欄從相關(guān)線程那里接收了適當(dāng)數(shù)目的wait()調(diào)用時,它就同時喚醒所有阻塞的線程。
舉個簡單的例子,運(yùn)動員進(jìn)行賽跑比賽時,雖然他們到達(dá)終點有先后順序,但會等到所有的運(yùn)動員跑完比賽后才一起領(lǐng)獎。
?
面向?qū)ο蟮木€程類ACE_Task
?
我們在前一章中使用ACE_Thread包裝時,你一定已經(jīng)注意到了一些不夠"優(yōu)雅"的地方。那一章中的大多數(shù)程序都被分解為函數(shù)、而不是對象。這是因為ACE_Thread包裝需要一個全局函數(shù)名、或是靜態(tài)方法作為參數(shù)。隨后該函數(shù)(靜態(tài)方法)就被用作所派生的線程的"啟動點"。這自然就使得程序員要為每個線程寫一個函數(shù)。如我們已經(jīng)看到的,這可能會導(dǎo)致非面向?qū)ο蟮某绦蚍纸狻?
ACE_Task對常用線程處理進(jìn)行了OO包裝,通過ACE_Task,能對線程進(jìn)行更好的操作。
ACE_Task是ACE中的任務(wù)或主動對象“處理結(jié)構(gòu)”的基類。ACE使用此類來實現(xiàn)主動對象模式。所有希望成為“主動對象”的對象都必須由此類派生。同時可將它看作是更高級的、更為面向?qū)ο蟮木€程。
每個任務(wù)都含有一或多個線程,以及一個底層消息隊列。各個任務(wù)通過消息隊列進(jìn)行通信。至于消息隊列實現(xiàn)的內(nèi)在細(xì)節(jié)程序員不必關(guān)注。發(fā)送任務(wù)用putq() 將消息插入到另一任務(wù)的消息隊列中,接收任務(wù)通過使用getq()將消息提取出來。這樣的體系結(jié)構(gòu)大大簡化了多線程程序的編程模型。
其主要成員如下:
open():初始化資源
close():釋放資源
activate():啟動線程,可指定線程的數(shù)目
svc():線程的啟動位置
????? putq():放置消息到任務(wù)的消息隊列中
????? getq():從任務(wù)的消息隊列中取出消息
????? thr_count():返回任務(wù)中線程的數(shù)目
????? last_thread():返回任務(wù)中將線程計數(shù)器從1降為0的線程的ID
?
要創(chuàng)建任務(wù),需要進(jìn)行以下步驟:
open()方法應(yīng)該包含所有專屬于任務(wù)的初始化代碼。其中可能包括諸如連接控制塊、鎖和內(nèi)存這樣的資源。close()方法是相應(yīng)的終止方法。
在主動對象實例化后,你必須通過調(diào)用activate()啟用它。要在主動對象中創(chuàng)建的線程的數(shù)目,以及其他一些參數(shù),被傳遞給activate()方法。activate()方法會使svc()方法成為所有它生成的線程的啟動點。
如上面所提到的,在主動對象被啟用后,各個新線程在svc()方法中啟動。應(yīng)用開發(fā)者必須在子類中定義此方法。
下面的例子演示怎樣去創(chuàng)建任務(wù):
#include "ace/Task.h" #include "ace/OS.h" #include <iostream> using namespace std; class TaskThread: public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc(void) { for(int i=0;i<10;i++) { ACE_OS::sleep(1); cout<<endl<<"hello thread1"<<endl; } return 0; } }; int main(int argc, char *argv[]) { TaskThread task; task.activate(); while(true) ACE_OS::sleep(10); return 0; }ACE_Task也封裝了常用線程操作,如暫停,恢復(fù)及停止等,是不是非常簡單和方便呢。
其實ACE_Task的使用還不僅僅是這些,通過它還可實現(xiàn)一種很常用的網(wǎng)絡(luò)編程模式--主動對象模式,其具體功能在后續(xù)的設(shè)計模式部分將作詳細(xì)的介紹。
ACE_Message_Queue
在Windows和Linux的config文件中都沒有定義"ACE_HAS_TIMED_MESSAGE_BLOCKS"這個宏,所以msg_deadline_time和msg_execution_time都不起任何作用.
ACE_Message_Queue_Factory這個工廠提供三個靜態(tài)函數(shù)分別用來創(chuàng)建靜態(tài)消息隊列和兩種類型的動態(tài)消息隊列。靜態(tài)消息隊列的消息也支持優(yōu)先級,但是消息的優(yōu)先級是靜態(tài)的,不需要通過動態(tài)計算而來。水位用來控制消息隊列中數(shù)據(jù)的大小,高水位(high_water_mark)用于控制消息隊列的上限,它用于控制生產(chǎn)者往里面放數(shù)據(jù)的量,如果消息隊列中數(shù)據(jù)量已經(jīng)達(dá)到高水位,而用使用了鎖,既使“ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();”創(chuàng)建消息隊列,那么生產(chǎn)者將被阻塞。高水位很容易理解,但是低水位是用來做什么的呢?
只要消息隊列中還有數(shù)據(jù)消費(fèi)者就不會被阻塞的,而當(dāng)數(shù)據(jù)量超過高水位時,生產(chǎn)者會被阻塞,既然會被阻塞,那么它肯定需要被喚醒,那么什么時候由誰來喚醒生產(chǎn)者呢?這就是低水位的作用,消費(fèi)者一直消費(fèi)數(shù)據(jù),當(dāng)數(shù)據(jù)低于低水位時它就喚醒生產(chǎn)者。
下面的代碼很好的展示了靜態(tài)消息隊列的使用。
#include "ace/Message_Queue.h"#include "ace/Get_Opt.h"
#include "ace/OS.h"
#include <ace/Thread_Manager.h>
#include <ace/Synch.h>
//消息隊列指針
ACE_Message_Queue<ACE_MT_SYNCH>* mq;
const char S1[] = "C++";
const char S2[] = "Java";
const char S3[] = "PHP";
const char S4[] = "C#";
//四個消息指針
ACE_Message_Block* mb1, * mb2, * mb3, * mb4;
//生產(chǎn)者
static void* produce(void *arg)
{
static int loop = 1;
while(true)
{
ACE_OS::sleep(2);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer...\n"));
while(true)
{
if(loop == 1)
{
//將高水位設(shè)置為10, S1+S2的長度為3+4+2=9<10,因此可以將S3放進(jìn)去
//但是再放入S4時生產(chǎn)者將會被阻塞
//需要注意的是水位的大小并不是消息的個數(shù),而是消息隊列中消息里面的數(shù)據(jù)量之和
//如果也能以消息的個數(shù)作為高低水位的值就好了
mq->high_water_mark(10);
mq->enqueue_prio (mb1);
mq->enqueue_prio (mb2);
mq->enqueue_prio (mb3);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending!!\n"));
//因為消費(fèi)者在睡眠6秒之后才會調(diào)用deactivate,因此生產(chǎn)者會在這兒阻塞幾秒鐘
//可以不斷地將msg_bytes打印出來觀察觀察
int ret = mq->enqueue_prio (mb4);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by deactivate, ret = %d!!\n", ret));
++loop;
}
if(loop == 2)
{
ACE_OS::sleep(6);
//將低水位設(shè)置為5,因為高水位仍然為10,當(dāng)前的數(shù)據(jù)量又超過了10,
//所以下面的入隊操作仍會將生產(chǎn)者阻塞
//這樣消費(fèi)者消費(fèi)消息,當(dāng)數(shù)據(jù)量小于5時,將喚醒生產(chǎn)者
//生產(chǎn)者在此處等待被消費(fèi)者喚醒
mq->low_water_mark(5);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending again!!\n"));
mq->enqueue_prio (mb4);
ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by consumer!!\n"));
++loop;
}
}
}
return NULL;
}
//消費(fèi)者
void* consume(void *arg)
{
static int loop = 1;
while(true)
{
ACE_OS::sleep(2);
ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer...\n"));
if(loop == 1)
{
//等待6秒,此時生產(chǎn)者和消費(fèi)者都將被阻塞
ACE_OS::sleep(6);
//deactivate會喚醒所有的線程,將消息隊列設(shè)置為不可用
//以后所存取操作都會返回-1
//這個操作會喚醒生產(chǎn)者
mq->deactivate();
++loop;
}
if(loop == 2)
{
ACE_OS::sleep(2);
//將消息隊列的狀態(tài)設(shè)置成ACTIVATED
//消息又可以使用了
mq->activate();
++loop;
}
if(loop == 3)
{
ACE_OS::sleep(10);
//消費(fèi)兩個消息之后,數(shù)據(jù)量就小于5了,低于低水位將喚醒生產(chǎn)者
ACE_Message_Block *mb;
mq->dequeue_head (mb);
mq->dequeue_head (mb);
ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer wake up producer!!\n"));
++loop;
}
}
return NULL;
}
int main(int argc, char* argv[])
{
mq = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();
int priority;
//使用隨機(jī)數(shù)作為消息的優(yōu)先級
//數(shù)字越高,優(yōu)先級越高
priority = ACE_OS::rand() % 100;
mb1 = new ACE_Message_Block(S1, sizeof S1, priority);
priority = ACE_OS::rand() % 100;
mb2 = new ACE_Message_Block(S2, sizeof S2, priority);
priority = ACE_OS::rand() % 100;
mb3 = new ACE_Message_Block(S3, sizeof S3, priority);
priority = ACE_OS::rand() % 100;
mb4 = new ACE_Message_Block(S4, sizeof S4, priority);
//將消息壓入隊列中,enqueue_prio根據(jù)消息的優(yōu)先級將消息放到適當(dāng)?shù)奈恢蒙?/span>
//enqueue_head只是簡單地將數(shù)據(jù)存入隊列中,而不考慮消息的優(yōu)先級
//使用enqueue_prio壓入消息后,可以簡單通過dequeue_head和dequeue_tail
//分別按優(yōu)先級從高到低和從低到高取消息
//如果使用enqueue_head和enqueue_tail壓入消息
//則需要通過dequeue_prio來按照消息的優(yōu)先級依次將消息出隊列
//沒有必要既使用enqueue_prio壓入消息,又實用dequeue_prio來取消息
mq->enqueue_prio (mb1);
mq->enqueue_prio (mb2);
mq->enqueue_prio (mb3);
mq->enqueue_prio (mb4);
//輸出靜態(tài)消息隊列的相關(guān)信息
//高低水位默認(rèn)值均為16384
ACE_DEBUG((LM_DEBUG, "count : %d, bytes : %d, length : %d, high_water_mark : %d, low_water_mark : %d, status : %d\n",
mq->message_count(), mq->message_bytes(), mq->message_length(),
mq->high_water_mark(), mq->low_water_mark(),
mq->state()));
ACE_Message_Block *mb;
//使用next遍歷消息,遍歷的順序為高優(yōu)先級到底優(yōu)先級
ACE_DEBUG((LM_DEBUG, "===========next=============\n"));
//peek一下,并不彈出消息,類似Windows的PeekMessage
mq->peek_dequeue_head(mb);
do
{
ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
}while(mb = mb->next());
//使用迭代器遍歷消息隊列,遍歷的順序為高優(yōu)先級到底優(yōu)先級
ACE_DEBUG((LM_DEBUG, "=========iterator=============\n"));
ACE_Message_Queue<ACE_MT_SYNCH>::ITERATOR iterator (*mq);
for (ACE_Message_Block *entry = 0;
iterator.next (entry) != 0;
iterator.advance ())
{
ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", entry->rd_ptr(), entry->msg_priority()));
}
ACE_DEBUG((LM_DEBUG, "============dequeue_head==========\n"));
while(mq->dequeue_head (mb) != -1)
{
ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
//這里如果不判斷的話,消息隊列空時會導(dǎo)致主線程被阻塞
if(mq->is_empty())
break;
}
ACE_DEBUG((LM_DEBUG, "\n\n"));
//測試高低水位和隊列的state使用,進(jìn)行測試之前mq隊列已空///
//產(chǎn)生一個生產(chǎn)者線程
ACE_Thread_Manager::instance()->spawn_n
(
1,
(ACE_THR_FUNC) produce
);
產(chǎn)生兩個消費(fèi)者線程
ACE_Thread_Manager::instance()->spawn_n
(
1,
(ACE_THR_FUNC) consume
);
//掛起主線程
ACE_Thread_Manager::instance()->wait();
return 0;
}
?
?
ACE中TCP通信
Tcp通信過程一般為如下步驟:
服務(wù)器綁定端口,等待客戶端連接。
客戶端通過服務(wù)器的ip和服務(wù)器綁定的端口連接服務(wù)器。
服務(wù)器和客戶端通過網(wǎng)絡(luò)建立一條數(shù)據(jù)通路,通過這條數(shù)據(jù)通路進(jìn)行數(shù)據(jù)交互。
常用API:
1. ACE_INET_Addr類。
ACE"地址"類ACE_Addr的子類,表示TCP/IP和UDP/IP的地址。它通常包含機(jī)器的ip和端口信息,通過它可以定位到所通信的進(jìn)程。
定義方式:
ACE_INET_Addr addInfo(3000,"192.168.1.100");
常用方法:
get_host_name??? 獲取主機(jī)名
get_ip_address??? 獲取ip地址
get_port_number??? 獲取端口號
2. ACE_SOCK_Acceptor類。
服務(wù)期端使用,用于綁定端口和被動地接受連接。
常用方法:
open 綁定端口
accept建立和客戶段的連接
3. ACE_SOCK_Connector類。
客戶端使用,用于主動的建立和服務(wù)器的連接。
常用方法:
connect()??? 建立和服務(wù)期的連接。
4. ACE_SOCK_Stream類。
客戶端和服務(wù)器都使用,表示客戶段和服務(wù)器之間的數(shù)據(jù)通路。
常用方法:
send ()??? 發(fā)送數(shù)據(jù)
recv ()??? 接收數(shù)據(jù)
close()??? 關(guān)閉連接(實際上就是斷開了socket連接)。
代碼示例:
下面例子演示了如何如何用ACE創(chuàng)建TCP通信的Server端。
#include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/INET_Addr.h" #include "ace/OS.h"#include <string> #include <iostream> using namespace std;int main(int argc, char *argv[]) {ACE_INET_Addr port_to_listen(3000); //綁定的端口ACE_SOCK_Acceptor acceptor;if (acceptor.open (port_to_listen, 1) == -1) //綁定端口{cout<<endl<<"bind port fail"<<endl;return -1;}while(true){ACE_SOCK_Stream peer; //和客戶端的數(shù)據(jù)通路ACE_Time_Value timeout (10, 0);if (acceptor.accept (peer) != -1) //建立和客戶端的連接{cout<<endl<<endl<<"client connect. "<<endl;char buffer[1024];ssize_t bytes_received;ACE_INET_Addr raddr;peer.get_local_addr(raddr);cout<<endl<<"local port\t"<<raddr.get_host_name()<<"\t"<<raddr.get_port_number()<<endl;while ((bytes_received =peer.recv (buffer, sizeof(buffer))) != -1) //讀取客戶端發(fā)送的數(shù)據(jù){peer.send(buffer, bytes_received); //對客戶端發(fā)數(shù)據(jù)}peer.close ();}}return 0; }這個例子實現(xiàn)的功能很簡單,服務(wù)器端綁定3000號端口,等待一個客戶端的連接,然后將從客戶端讀取的數(shù)據(jù)再次轉(zhuǎn)發(fā)給客戶端,也就是實現(xiàn)了一個EchoServer的功能。
相應(yīng)的客戶端程序也比較簡單,代碼如下:
#include <ace/SOCK_Stream.h> #include <ace/SOCK_Connector.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std;int main(int argc, char *argv[]) {ACE_INET_Addr addr(3000,"127.0.0.1");ACE_SOCK_Connector connector; ACE_Time_Value timeout(5,0);ACE_SOCK_Stream peer;if(connector.connect(peer,addr,&timeout) != 0){cout<<"connection failed !"<<endl;return 1;}cout<<"conneced !"<<endl;string s="hello world";peer.send(s.c_str(),s.length()); //發(fā)送數(shù)據(jù)cout<<endl<<"send:\t"<<s<<endl;ssize_t bc=0; //接收的字節(jié)數(shù)char buf[1024];bc=peer.recv(buf,1024,&timeout); //接收數(shù)據(jù)if(bc>=0){buf[bc]='\0';cout<<endl<<"rev:\t"<<buf<<endl;}peer.close();return 0; }?
ACE中UDP通信
udp是一種無連接的協(xié)議,提供無連接不可靠的服務(wù)。
在ace中,通過ACE_SOCK_Dgram類提供udp通信服務(wù),ACE_SOCK_Dgram和ACE_SOCK_Stream的API非常類似,一樣提供了send,recv及close等常用操作,這里就不再累述了。
udp通信時無需像tcp那樣建立連接和關(guān)閉連接,tcp編程時需要通過accept和connect來建立連接,而udp通信省略了這一步驟,相對來說編程更為簡單。
由于udp通信時無建立連接,服務(wù)器端不能像Tcp通信那樣在建立連接的時候就獲得客戶端的地址信息,故服務(wù)器端不能主動對客戶端發(fā)送信息(不知道客戶端的地址),只有等到收到客戶端發(fā)送的udp信息時才能確定客戶端的地址信息,從而進(jìn)行通信。
udp通信過程如下:
下面代碼為EchoServer的udp版:
//server.cpp #include <ace/SOCK_Dgram.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std; int main(int argc, char *argv[]) { ACE_INET_Addr port_to_listen(3000); //綁定的端口 ACE_SOCK_Dgram peer(port_to_listen); //通信通道 char buf[100]; while(true) { ACE_INET_Addr remoteAddr; //所連接的遠(yuǎn)程地址 int bc = peer.recv(buf,100,remoteAddr); //接收消息,獲取遠(yuǎn)程地址信息 if( bc != -1) { string s(buf,bc); cout<<endl<<"rev:\t"<<s<<endl; } peer.send(buf,bc,remoteAddr); //和遠(yuǎn)程地址通信 } return 0; }相應(yīng)的客戶端程序如下:
//client.cpp #include <ace/SOCK_Dgram.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std; int main(int argc, char *argv[]) { ACE_INET_Addr remoteAddr(3000,"127.0.0.1"); //所連接的遠(yuǎn)程地址 ACE_INET_Addr localAddr; //本地地址信息 ACE_SOCK_Dgram peer(localAddr); //通信通道 peer.send("hello",5,remoteAddr); //發(fā)送消息 char buf[100]; int bc = peer.recv(buf,100,remoteAddr); //接收消息 if( bc != -1) { string s(buf,bc); cout<<endl<<"rev:\t"<<s<<endl; } return 0; }和tcp編程相比,udp無需通過acceptor,connector來建立連接,故代碼相對tcp編程來說要簡單許多。另外,由于udp是一種無連接的通信方式,ACE_SOCK_Dgram的實例對象中無法保存遠(yuǎn)端地址信息(保存了本地地址信息),故通信的時候需要加上遠(yuǎn)端地址信息。
?
ACE主動對象模式
主動對象模式用于降低方法執(zhí)行和方法調(diào)用之間的耦合。該模式描述了另外一種更為透明的任務(wù)間通信方法。
傳統(tǒng)上,所有的對象都是被動的代碼段,對象中的代碼是在對它發(fā)出方法調(diào)用的線程中執(zhí)行的,當(dāng)方法被調(diào)用時,調(diào)用線程將阻塞,直至調(diào)用結(jié)束。而主動對象卻不一樣。這些對象具有自己的命令執(zhí)行線程,主動對象的方法將在自己的執(zhí)行線程中執(zhí)行,不會阻塞調(diào)用方法。
例如,設(shè)想對象"A"已在你的程序的main()函數(shù)中被實例化。當(dāng)你的程序啟動時,OS創(chuàng)建一個線程,以從main()函數(shù)開始執(zhí)行。如果你調(diào)用對象A的任何方法,該線程將"流過"那個方法,并執(zhí)行其中的代碼。一旦執(zhí)行完成,該線程返回調(diào)用該方法的點并繼續(xù)它的執(zhí)行。但是,如果"A"是主動對象,事情就不是這樣了。在這種情況下,主線程不會被主動對象借用。相反,當(dāng)"A"的方法被調(diào)用時,方法的執(zhí)行發(fā)生在主動對象持有的線程中。另一種思考方法:如果調(diào)用的是被動對象的方法(常規(guī)對象),調(diào)用會阻塞(同步的);而另一方面,如果調(diào)用的是主動對象的方法,調(diào)用不會阻塞(異步的)。
由于主動對象的方法調(diào)用不會阻塞,這樣就提高了系統(tǒng)響應(yīng)速度,在網(wǎng)絡(luò)編程中是大有用武之地的。
在這里我們將一個"Logger"(日志記錄器)對象為例來介紹如何將一個傳統(tǒng)對象改造為主動對象,從而提高系統(tǒng)響應(yīng)速度。
Logger的功能是將一些系統(tǒng)事件的記錄在存儲器上以備查詢,由于Logger使用慢速的I/O系統(tǒng)來記錄發(fā)送給它的消息,因此對Logger的操作將會導(dǎo)致系統(tǒng)長時間的等待。
其功能代碼簡化如下:
class Logger: public ACE_Task<ACE_MT_SYNCH>
{
public:
void LogMsg(const string& msg)
??? {
??????? cout<<endl<<msg<<endl;
??????? ACE_OS::sleep(2);
??? }
};
為了實現(xiàn)記錄日志操作的主動執(zhí)行,我們需要用命令模式將其封裝,從而使得記錄日志的方法能在合適的時間和地方主動執(zhí)行,封裝方式如下:
class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg){this->log=plog;this->msg=msg;}int call(){this->log->LogMsg(msg);return 0;}private:Logger *log;string msg; };class Logger: public ACE_Task<ACE_MT_SYNCH> { public:void LogMsg(const string& msg){cout<<endl<<msg<<endl;ACE_OS::sleep(2);}LogMsgCmd *LogMsgActive(const string& msg){new LogMsgCmd(this,msg);} };?
?
這里對代碼功能做一下簡單的說明:
ACE_Method_Object是ACE提供的命令模式借口,命令接口調(diào)用函數(shù)為int call(),在這里通過它可以把每個操作日志的調(diào)用封裝為一個LogMsgCmd對象,這樣,當(dāng)原來需要調(diào)用LogMsg的方法的地方只要調(diào)用LogMsgActive即可生成一個LogMsgCmd對象,由于調(diào)用LogMsgActive方法,只是對命令進(jìn)行了封裝,并沒有進(jìn)行日志操作,所以該方法會立即返回。然后再新開一個線程,將LogMsgCmd對象作為參數(shù)傳入,在該線程中執(zhí)行LogMsgCmd對象的call方法,從而實現(xiàn)無阻塞調(diào)用。
然而,每次對一個LogMsg調(diào)用都開啟一個新線程,無疑是對資源的一種浪費(fèi),實際上我們往往將生成的LogMsgCmd對象插入一個命令隊列中,只新開一個命令執(zhí)行線程依次執(zhí)行命令隊列中的所有命令。并且,為了實現(xiàn)對象的封裝,命令隊列和命令執(zhí)行線程往往也封裝到Logger對象中,代碼如下所示:
#include "ace/OS.h" #include "ace/Task.h" #include "ace/Method_Object.h" #include "ace/Activation_Queue.h" #include "ace/Auto_Ptr.h"#include <string> #include <iostream> using namespace std;class Logger: public ACE_Task<ACE_MT_SYNCH> { public:Logger(){this->activate();}int svc();void LogMsg(const string& msg);void LogMsgActive (const string& msg);private:ACE_Activation_Queue cmdQueue; //命令隊列 };class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg){this->log=plog;this->msg=msg;}int call(){this->log->LogMsg(msg);return 0;}private:Logger *log;string msg; };void Logger::LogMsg(const string& msg) {cout<<endl<<msg<<endl;ACE_OS::sleep(2); }//以主動的方式記錄日志 void Logger::LogMsgActive(const string& msg) {//生成命令對象,插入到命令隊列中cmdQueue.enqueue(new LogMsgCmd(this,msg)); }int Logger::svc() {while(true){//遍歷命令隊列,執(zhí)行命令auto_ptr<ACE_Method_Object> mo(this->cmdQueue.dequeue ());if (mo->call () == -1)break;}return 0; }int main (int argc, ACE_TCHAR *argv[]) {Logger log;log. LogMsgActive ("hello");ACE_OS::sleep(1);log.LogMsgActive("abcd");while(true)ACE_OS::sleep(1);return 0; }在這里需要注意一下命令隊列ACE_Activation_Queue對象,它是線程安全的,使用方法比較簡單,這里我也不多介紹了。
主動對象的基本結(jié)構(gòu)就是這樣,然而,由于主動對象是異步調(diào)用的,又引出了如下兩個新問題:
通過ACE_Future對象來解決上述兩個問題的方法如下:
- 首先創(chuàng)建ACE_Future對象用以保留返回值。
- 調(diào)用主動命令時將ACE_Future對象作為參數(shù)傳入,生成的命令對象中保存ACE_Future對象的指針。
- 命令執(zhí)行線程執(zhí)行完命令后,將返回值通過set()函數(shù)設(shè)置到ACE_Future對象中。
- 調(diào)用線程可以通過ACE_Future對象的ready()函數(shù)查詢該命令是否執(zhí)行完成,如果命令執(zhí)行完成,則可通過get()函數(shù)來獲取返回值。
使用的時候要注意一下ACE_Future對象的生命周期。
為了演示了如何獲取主動命令的執(zhí)行狀態(tài)和結(jié)果,我將上篇文章中的代碼改動了一下,日志類記錄日志后,會將記錄的內(nèi)容作為返回值返回,該返回值會通過ACE_Future對象返回,代碼如下
#include "ace/OS.h" #include "ace/Task.h" #include "ace/Method_Object.h" #include "ace/Activation_Queue.h" #include "ace/Auto_Ptr.h"#include "ace/Future.h"#include <string> #include <iostream> using namespace std;class Logger: public ACE_Task<ACE_MT_SYNCH> { public:Logger(){this->activate();}int svc();string LogMsg(const string& msg);void LogMsgActive (const string& msg,ACE_Future<string> *result);private:ACE_Activation_Queue cmdQueue; //命令隊列 };class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg,ACE_Future<string> *result){this->log=plog;this->msg=msg;this->result=result;}int call(){string reply = this->log->LogMsg(msg);result->set(reply);return 0;}private:ACE_Future<string> *result;Logger *log;string msg; };string Logger::LogMsg(const string& msg) {ACE_OS::sleep(2);cout<<endl<<msg<<endl;return msg; }//以主動的方式記錄日志 void Logger::LogMsgActive(const string& msg,ACE_Future<string> *result) {//生成命令對象,插入到命令隊列中cmdQueue.enqueue(new LogMsgCmd(this,msg,result)); }int Logger::svc() {while(true){//遍歷命令隊列,執(zhí)行命令auto_ptr<ACE_Method_Object> mo(this->cmdQueue.dequeue ());if (mo->call () == -1)break;}return 0; }void get_info(ACE_Future<string> &fu) {string state = fu.ready()?"ready":"not ready";cout<<endl<<state<<endl;if(fu.ready()){string value;fu.get(value);cout<<"value:\t"<<value<<endl;} }int main (int argc, ACE_TCHAR *argv[]) {ACE_Future<string> result;Logger log;log.LogMsgActive ("hello",&result);while(true){get_info(result);if(result.ready())break;ACE_OS::sleep(1);}cout<<endl<<"cmd end"<<endl;while(true)ACE_OS::sleep(1);return 0; }這種查詢模式比較簡單有效,但存在一個問題:調(diào)用線程必須不斷輪詢ACE_Future對象以獲取返回值,這樣的效率比較低。可以通過觀察者模式解決這個問題:在ACE_Future對象上注冊一個觀察者,當(dāng)ACE_Future對象的值發(fā)生改變(異步命令執(zhí)行完成)時主動通知該觀察者,從而獲取返回值。
ACE中的觀察者模式可以通過ACE_Future_Observer來實現(xiàn),使用方法如下:
通過觀察者模式,可以更有效,及時的獲取異步命令的返回值,但同時也增加了程序結(jié)構(gòu)的復(fù)雜度并且難以調(diào)試,使用的時候應(yīng)該根據(jù)需要選取合適的方式。
#include "ace/Future.h"#include <string> #include <iostream> using namespace std;class MyObserver:public ACE_Future_Observer<string> {virtual void update (const ACE_Future<string> &future){string value;future.get(value);cout<<endl<<"change:\t"<<value<<endl;} };int main(int argc, char *argv[]) {MyObserver obv;ACE_Future<string> fu;fu.attach(&obv);ACE_OS::sleep(3);fu.set("12345");while(true)ACE_OS::sleep(3);return 0; }?
ACE反應(yīng)器(Reactor)模式
反應(yīng)器(Reactor):用于事件多路分離和分派的體系結(jié)構(gòu)模式
通常的,對一個文件描述符指定的文件或設(shè)備, 有兩種工作方式: 阻塞與非阻塞。所謂阻塞方式的意思是指, 當(dāng)試圖對該文件描述符進(jìn)行讀寫時, 如果當(dāng)時沒有東西可讀,或者暫時不可寫, 程序就進(jìn)入等待狀態(tài), 直到有東西可讀或者可寫為止。而對于非阻塞狀態(tài), 如果沒有東西可讀, 或者不可寫, 讀寫函數(shù)馬上返回, 而不會等待。
在前面的章節(jié)中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:當(dāng)接收tcp數(shù)據(jù)時,如果遠(yuǎn)端沒有數(shù)據(jù)可以讀,則會一直阻塞到讀到需要的數(shù)據(jù)為止。這種方式的傳輸和傳統(tǒng)的被動方法的調(diào)用類似,非常直觀,并且簡單有效,但是同樣也存在一個效率問題,如果你是開發(fā)一個面對著數(shù)千個連接的服務(wù)器程序,對每一個客戶端都采用阻塞的方式通信,如果存在某個非常耗時的讀寫操作時,其它的客戶端通信將無法響應(yīng),效率非常低下。
一種常用做法是:每建立一個Socket連接時,同時創(chuàng)建一個新線程對該Socket進(jìn)行單獨(dú)通信(采用阻塞的方式通信)。這種方式具有很高的響應(yīng)速度,并且控制起來也很簡單,在連接數(shù)較少的時候非常有效,但是如果對每一個連接都產(chǎn)生一個線程的無疑是對系統(tǒng)資源的一種浪費(fèi),如果連接數(shù)較多將會出現(xiàn)資源不足的情況。
另一種較高效的做法是:服務(wù)器端保存一個Socket連接列表,然后對這個列表進(jìn)行輪詢,如果發(fā)現(xiàn)某個Socket端口上有數(shù)據(jù)可讀時(讀就緒),則調(diào)用該socket連接的相應(yīng)讀操作;如果發(fā)現(xiàn)某個Socket端口上有數(shù)據(jù)可寫時(寫就緒),則調(diào)用該socket連接的相應(yīng)寫操作;如果某個端口的Socket連接已經(jīng)中斷,則調(diào)用相應(yīng)的析構(gòu)方法關(guān)閉該端口。這樣能充分利用服務(wù)器資源,效率得到了很大提高。
在Socket編程中就可以通過select等相關(guān)API實現(xiàn)這一方式。但直接用這些API控制起來比較麻煩,并且也難以控制和移植,在ACE中可以通過Reactor模式簡化這一開發(fā)過程。
反應(yīng)器本質(zhì)上提供一組更高級的編程抽象,簡化了事件驅(qū)動的分布式應(yīng)用的設(shè)計和實現(xiàn)。除此而外,反應(yīng)器還將若干不同種類的事件的多路分離集成到易于使用的API中。特別地,反應(yīng)器對基于定時器的事件、信號事件、基于I/O端口監(jiān)控的事件和用戶定義的通知進(jìn)行統(tǒng)一地處理。
ACE中的反應(yīng)器與若干內(nèi)部和外部組件協(xié)同工作。其基本概念是反應(yīng)器框架檢測事件的發(fā)生(通過在OS事件多路分離接口上進(jìn)行偵聽),并發(fā)出對預(yù)登記事件處理器(event handler)對象中的方法的"回調(diào)"(callback)。該方法由應(yīng)用開發(fā)者實現(xiàn),其中含有應(yīng)用處理此事件的特定代碼。
使用ACE的反應(yīng)器,只需如下幾步:
創(chuàng)建事件處理器,以處理他所感興趣的某事件。
在反應(yīng)器上登記,通知說他有興趣處理某事件,同時傳遞他想要用以處理此事件的事件處理器的指針給反應(yīng)器。
隨后反應(yīng)器框架將自動地:
在內(nèi)部維護(hù)一些表,將不同的事件類型與事件處理器對象關(guān)聯(lián)起來。
在用戶已登記的某個事件發(fā)生時,反應(yīng)器發(fā)出對處理器中相應(yīng)方法的回調(diào)。
反應(yīng)器模式在ACE中被實現(xiàn)為ACE_Reactor類,它提供反應(yīng)器框架的功能接口。
如上面所提到的,反應(yīng)器將事件處理器對象作為服務(wù)提供者使用。反應(yīng)器內(nèi)部記錄某個事件處理器的特定事件的相關(guān)回調(diào)方法。當(dāng)這些事件發(fā)生時,反應(yīng)器會創(chuàng)建這種事件和相應(yīng)的事件處理器的關(guān)聯(lián)。
事件處理器就是需要通過輪詢發(fā)生事件改變的對象列表中的對象,如在上面的例子中就是連接的客戶端,每個客戶端都可以看成一個事件處理器。
就是反應(yīng)器支持的事件,如Socket讀就緒,寫就緒。拿上面的例子來說,如果某個客戶端(事件處理器)在反應(yīng)器中注冊了讀就緒事件,當(dāng)客戶端給服務(wù)器發(fā)送一條消息的時候,就會觸發(fā)這個客戶端的數(shù)據(jù)可讀的回調(diào)函數(shù)。
在反應(yīng)器框架中,所有應(yīng)用特有的事件處理器都必須由ACE_Event_Handler的抽象接口類派生。可以通過重載相應(yīng)的"handle_"方法實現(xiàn)相關(guān)的回調(diào)方法。
使用ACE_Reactor基本上有三個步驟:
下面我就以一個Socket客戶端的例子為例簡單的說明反應(yīng)器的基本用法。
#include <ace/OS.h> #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <string> #include <iostream> using namespace std;class MyClient:public ACE_Event_Handler { public:bool open(){ACE_SOCK_Connector connector;ACE_INET_Addr addr(3000,"127.0.0.1");ACE_Time_Value timeout(5,0);if(connector.connect(peer,addr,&timeout) != 0){cout<<endl<<"connecetd fail";return false;}ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);cout<<endl<<"connecetd ";return true;}ACE_HANDLE get_handle(void) const{return peer.get_handle();}int handle_input (ACE_HANDLE fd){int rev=0;ACE_Time_Value timeout(5,0);if((rev=peer.recv(buffer,1000,&timeout))>0){buffer[rev]='\0';cout<<endl<<"rev:\t"<<buffer<<endl;}return 3;}private:ACE_SOCK_Stream peer;char buffer[1024]; };int main(int argc, char *argv[]) {MyClient client;client.open();while(true){ACE_Reactor::instance()->handle_events(); }return 0; }在這個例子中,客戶端連接上服務(wù)器后,通過ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注冊了一個讀就緒的回調(diào)函數(shù),當(dāng)服務(wù)器端給客戶端發(fā)消息的時候,會自動觸發(fā)handle_input()函數(shù),將接收到的信息打印出來。
在Socket編程中,常見的事件就是"讀就緒","寫就緒",通過對這兩個事件的捕獲分發(fā),可以實現(xiàn)Socket中的異步操作。
Socket編程中的事件處理器
在前面我們已經(jīng)介紹過,在ACE反應(yīng)器框架中,任何都必須派生自ACE_Event_Handler類,并通過重載其相應(yīng)會調(diào)事件處理函數(shù)來實現(xiàn)相應(yīng)的回調(diào)處理的。在Socket編程中,我們通常需要重載的函數(shù)有
當(dāng)I/O句柄(比如UNIX中的文件描述符)上的輸入可用時,反應(yīng)器自動回調(diào)該方法。
當(dāng)I/O設(shè)備的輸出隊列有可用空間時,反應(yīng)器自動回調(diào)該方法。
當(dāng)事件處理器中的事件從Reactor中移除的時候調(diào)用。
此外,為了使Reactor能通過I/O句柄找到對應(yīng)的事件處理器,還必須重載其get_handle()方法以使得Reactor建立起I/O句柄和事件處理器的關(guān)聯(lián)。
使用Reactor框架。
下面我們將以一個客戶端的程序為例,介紹如何在Socket編程中使用Reactor框架。
一.建立一個客戶端對象(事件處理器)。
客戶端對象就是一個事件處理器,其聲明如下:
class Client:public ACE_Event_Handler { public: ACE_HANDLE get_handle(void) const; int handle_input (ACE_HANDLE fd); int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); ACE_SOCK_Stream& Peer(); private: ACE_SOCK_Stream peer; };在Client端中我只關(guān)心"讀就緒"事件,故只重載了handle_input函數(shù)(大多數(shù)應(yīng)用下只需要重載handle_input函數(shù))。另外,在客戶端還保存了一個ACE_SOCK_Stream的peer對象用來進(jìn)行Socket通信,同時封裝了一個Peer()函數(shù)返回它的引用。
二.重載相應(yīng)回調(diào)處理函數(shù)
ACE_SOCK_Stream& Client::Peer() { return peer; } ACE_HANDLE Client::get_handle(void) const { return peer.get_handle(); } int Client::handle_input (ACE_HANDLE fd) { int rev=0; if((rev = peer.recv(buffer,1000))>0) { buffer[rev]='\0'; cout<<endl<<"rev:\t"<<buffer<<endl; return 0; } else //Socket連接發(fā)生錯誤,返回-1,在Reactor中注銷事件,觸發(fā)handle_close函數(shù) { return -1; } } int Client::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { cout<<endl<<"connecetd closed"; return ACE_Event_Handler::handle_close(handle,close_mask); }?
三.在Reactor中注冊事件
首先讓我們來看看相應(yīng)的main函數(shù)的代碼:
int main(int argc, char *argv[]) {Client client;ACE_SOCK_Connector connector;ACE_INET_Addr addr(3000,"127.0.0.1");ACE_Time_Value timeout(5,0);if(connector.connect(client.Peer(),addr,&timeout) != 0){cout<<endl<<"connecetd fail";return 0;}ACE_Reactor::instance()->register_handler(&client,ACE_Event_Handler::READ_MASK);while(true) { ACE_Reactor::instance()->handle_events(); }return 0; }在這里可以看到,使用Reactor框架后,依然首先通過ACE_SOCK_Connector的connect函數(shù)來建立連接。建立連接后,可以通過ACE_Reactor::instance()->register_handler函數(shù)來實現(xiàn)Reactor的注冊,實現(xiàn)I/O事件和Client對象的handle_input方法相關(guān)聯(lián),它的第一個參數(shù)是事件處理器的地址,第二個參數(shù)是事件類型,由于這里只關(guān)心讀就緒事件,故注冊的事件類型是ACE_Event_Handler::READ_MASK。
四.啟動Reactor事件循環(huán)
通過如上設(shè)置后,我們就可以通過ACE_Reactor::instance()->handle_events()啟動Reactor循環(huán)了,這樣,每當(dāng)服務(wù)器端有數(shù)據(jù)發(fā)送給客戶端時,當(dāng)客戶端的數(shù)據(jù)就緒時,就回觸發(fā)Client對象的handle_input函數(shù),將接收的數(shù)據(jù)打印出來。
通常的做法是,將Reactor事件循環(huán)作為一個單獨(dú)的線程來處理,這樣就不會阻塞main函數(shù)。
五.注銷Reactor事件
Reactor事件的注銷一般有兩種方式,顯式和隱式,下面將分別給予介紹。
當(dāng)Reactor捕獲事件后,會觸發(fā)相應(yīng)的"handle_"處理函數(shù),當(dāng)"handle_"處理函數(shù)返回值大于或等于0時,表示處理事件成功,當(dāng)返回值小于0時,表示處理事件失敗,這時Reactor會自動注銷該句柄所注冊的所有事件,并觸發(fā)handle_close函數(shù),以執(zhí)行相應(yīng)的資源清理工作。
在本例中,當(dāng)handle_input函數(shù)里的recv函數(shù)接收到0個數(shù)時,說明socket發(fā)生錯誤(大多為Socket連接中斷),此時返回-1,則系統(tǒng)自動注銷相應(yīng)事件。
調(diào)用Reactor對象的remove_handler方法移除,它有兩個參數(shù),第一個是所注冊的事件反應(yīng)器對象,第二個是所要注銷的事件。
在這個示例程序里,連接方只有一個Socket連接,Reactor的優(yōu)勢并沒有體現(xiàn)出來,但在一些網(wǎng)絡(luò)管理系統(tǒng)里,連接方需要對多個需要管理的設(shè)備(服務(wù)器端)進(jìn)行連接,在這種情況下使用Reactor模式,只需要多開一個Reactor事件循環(huán)線程就能實現(xiàn)事件多路分發(fā)復(fù)用,并且不會阻塞,通過面向?qū)ο蟮幕卣{(diào)方式管理,使用起來非常方便。
Reactor框架的另外一個常用的地方就是服務(wù)器端,一般是一個服務(wù)器端對應(yīng)多個客戶端,這樣用Reactor模式能大幅提高并發(fā)能力,這方面的編程方法將在下一章給與介紹。
?
在服務(wù)器端使用Reactor框架
使用Reactor框架的服務(wù)器端結(jié)構(gòu)如下:
服務(wù)器端注冊兩種事件處理器,ClientAcceptor和ClientService ,ClientService類負(fù)責(zé)和客戶端的通信,每一個ClientService對象對應(yīng)一個客戶端的Socket連接。 ClientAcceptor專門負(fù)責(zé)被動接受客戶端的連接,并創(chuàng)建ClientService對象。這樣,在一個N個Socket連接的服務(wù)器程序中,將存在1個ClientAcceptor對象和N個ClientService對象。
整個服務(wù)器端流程如下:
首先創(chuàng)建一個ClientAcceptor對象,該對象在Reactor上注冊ACCEPT_MASK事件,Reactor將自動在監(jiān)聽端口建立Socket監(jiān)聽。
如果有對該端口的Socket連接時,Reactor將自動回調(diào)handle_input方法,ClientAcceptor重載此方法,并創(chuàng)建一個ClientService對象,用于處理和Client的通信。
ClientService對象根據(jù)服務(wù)器的具體功能實現(xiàn),其處理過程和客戶端程序類似,注冊相應(yīng)的回調(diào)事件并分發(fā)即可。
代碼如下:
#include <ace/OS.h> #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/Auto_Ptr.h> class ClientService : public ACE_Event_Handler { public: ACE_SOCK_Stream &peer (void) { return this->sock_; } int open (void) { //注冊讀就緒回調(diào)函數(shù) return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { //一個簡單的EchoServer,將客戶端的信息返回 int rev = peer().recv(buf,100); if(rev<=0) return -1; peer().send(buf,rev); return 0; } // 釋放相應(yīng)資源 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask) { if (mask == ACE_Event_Handler::WRITE_MASK) return 0; mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, mask); this->sock_.close (); delete this; //socket出錯時,將自動刪除該客戶端,釋放相應(yīng)資源 return 0; } protected: char buf[100]; ACE_SOCK_Stream sock_; }; class ClientAcceptor : public ACE_Event_Handler { public: virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);} int open (const ACE_INET_Addr &listen_addr) { if (this->acceptor_.open (listen_addr, 1) == -1) { ACE_OS::printf("open port fail"); return -1; } //注冊接受連接回調(diào)事件 return this->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->acceptor_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { ClientService *client = new ClientService(); auto_ptr<ClientService> p (client); if (this->acceptor_.accept (client->peer ()) == -1) { ACE_OS::printf("accept client fail"); return -1; } p.release (); client->reactor (this->reactor ()); if (client->open () == -1) client->handle_close (ACE_INVALID_HANDLE, 0); return 0; } virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, m); this->acceptor_.close (); } return 0; } protected: ACE_SOCK_Acceptor acceptor_; }; int main(int argc, char *argv[]) { ACE_INET_Addr addr(3000,"192.168.1.142"); ClientAcceptor server; server.reactor(ACE_Reactor::instance()); server.open(addr); while(true) { ACE_Reactor::instance()->handle_events(); } return 0; }代碼功能比較簡單,需要注意以下幾點:
這里注冊事件的方式和前面的文章中方式不一樣,是通過ACE_Event_Handler類的reactor()方法設(shè)置和獲取reactor的指針,比較直觀和方便。前面的文章是通過ACE_Reactor::instance()來獲取的一個單體reactor的指針。
定時器的實現(xiàn)
通過Reactor機(jī)制,還可以很容易的實現(xiàn)定時器的功能,使用方式如下。
編寫一個事件反應(yīng)器,重載handle_timeout()方法,該方法是定時器的觸發(fā)時間到時,會自動觸發(fā)該方法。
通過Reactor的schedule_timer()方法注冊定時器。
啟動reacotr的handle_events()事件分發(fā)循環(huán)。
當(dāng)不想使用定時器時,可以通過Reactor的cancel_timer()方法注銷定時器。
下面的代碼簡單的實現(xiàn)了一個定時器,并具有基本的開啟,關(guān)閉功能。
#include <ace/OS.h> #include <ace/Reactor.h> class MyTimerHandler : public ACE_Event_Handler { private: int inteval; //執(zhí)行時間間隔 int delay; //延遲執(zhí)行時間 int timerid; public: MyTimerHandler(int delay,int inteval) { this->delay=delay; this->inteval=inteval; } int open() //注冊定時器 { ACE_Time_Value delaytime(inteval); ACE_Time_Value intevaltime(inteval); timerid = reactor()->schedule_timer(this, 0, //傳遞handle_timeout給的參數(shù) delaytime, intevaltime); return timerid; } int close() //取消定時器 { return reactor()->cancel_timer(timerid); } //定時器回調(diào)函數(shù) int handle_timeout (const ACE_Time_Value ¤t_time, const void * = 0) { time_t epoch = ((timespec_t)current_time).tv_sec; ACE_DEBUG ((LM_INFO, ACE_TEXT ("handle_timeout: %s\n"), ACE_OS::ctime (&epoch))); return 0; } }; int main(int argc, char *argv[]) { MyTimerHandler * timer = new MyTimerHandler (3,5); timer->reactor(ACE_Reactor::instance()); timer->open(); for(int i=0;i<2;i++) //觸發(fā)次handle_timeout事件 { ACE_OS::printf("\n%d\n",i); ACE_Reactor::instance()->handle_events(); } timer->close(); ACE_OS::printf("cancel timer"); while(true) ACE_Reactor::instance()->handle_events(); return 0; }代碼功能比較簡單,這里就不多做介紹了。
轉(zhuǎn)載于:https://www.cnblogs.com/dubingsky/archive/2009/07/22/1528292.html
與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
- 上一篇: Web站点下的Web Service读取
- 下一篇: 得到计算机名