日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ACE入门

發(fā)布時間:2024/9/5 编程问答 61 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ACE入门 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

ACE編譯

1. 設(shè)置環(huán)境變量

  • 在操作系統(tǒng)添加一個名為ACE_ROOT的用戶環(huán)境變量,值為剛才ace的解壓路徑D:\Develop\ACE_wrappers。
  • 添加用戶的Path環(huán)境變量,值為%ACE_ROOT%\lib,這樣才能保證系統(tǒng)能找到ace生成的動態(tài)連接庫。
  • 設(shè)置VS2005的C++開發(fā)項目信息,依次打開菜單 工具-選項-項目和解決方案-VC++目錄 ,在右側(cè)目錄列表中選擇"包含目錄",添加$(ACE_ROOT),在右側(cè)目錄列表中選擇"庫文件",添加$(ACE_ROOT)\lib。
  • 2. 在 ACE_wrappers\ace 目錄下創(chuàng)建 config.h 文件,寫入:
    #include "ace/config-win32.h"?

    Code
    3.?如果你希望使用標(biāo)準(zhǔn)的?C++?頭文件(例如?iostream、cstdio?等)在?#include?"ace/config-win32.h"?前加入:?
    #define?ACE_HAS_STANDARD_CPP_LIBRARY?1?

    4.?如果你希望使用?MFC?庫,那么?config.h?中加入:?

    #define?ACE_HAS_MFC?1?

    如果你希望使用?MFC?靜態(tài)庫,那么加入:?

    #define?ACE_USES_STATIC_MFC?

    5.?如果你希望編譯靜態(tài)版本的?ACE?庫,那么在?config.h?中加入:?

    #define?ACE_AS_STATIC_LIBS?

    6.?如果你希望減少靜態(tài)庫的大小,可以禁止使用?inline,在?config.h?的?#include?"ace/config-win32.h"?前加入:?

    #define?ACE_NO_INLINE?

    ?

    ACE結(jié)構(gòu)簡介

    1)ACE OS adaptation 層:封裝了 OS API,對上層提供 OS 平臺無關(guān)的接口。

    2)C++ wrapper facades 層:位于 OS adaptation 之上,提供了與之相似的功能,這些功能使用 C++ 的類封裝起來,而不是 C 語言 API。每個 wrapper facade 都包含一個或者一個以上的類。我們可以有選擇的繼承、聚合這些 wrapper facade。

    3)框架層(Framework layer)

    框架層在 C++ wrapper facades 層之上,它集成和擴(kuò)充了 wrapper facade 類。

    <1> 事件多路分離和分發(fā)框架

    ACE Reactor 和 ACE Proactor 實現(xiàn)了 Reactor 模式和 Proactor 模式。

    <2> 連接建立和服務(wù)初始化框架

    ACE Acceptor-Connector 框架實現(xiàn)了 Acceptor-Connector 模式。

    <3> 并發(fā)框架

    ACE 提供了 Task 框架實現(xiàn)了并發(fā)模式。

    <4> 服務(wù)配置框架

    ACE 的服務(wù)配置框架實現(xiàn)了 Component Configurator 模式。

    <5> 流框架

    ACE 的流框架實現(xiàn)了 Pipes and Fiters 模式。

    4)ACE 網(wǎng)絡(luò)組件層

    組件(component)就是軟件系統(tǒng)中被封裝的一個部分,ACE 發(fā)行包中的組件用于提供以下功能:

    <1> 演示 ACE

    <2> 提供常見網(wǎng)絡(luò)服務(wù)的可復(fù)用實現(xiàn)。如提供日志記錄、時間同步等服務(wù)的可復(fù)用實現(xiàn)。

    ?

    線程的創(chuàng)建與管理

    一. 線程入口函數(shù)

    所有線程必須從一個指定的函數(shù)開始執(zhí)行,該函數(shù)稱為線程函數(shù),它必須具有下列原型:
    void* worker(void *arg) {}
    該函數(shù)輸入一個void *型的參數(shù),可以在創(chuàng)建線程時傳入。
    注意:所有的線程啟動函數(shù)(方法)必須是靜態(tài)的或全局的(就如同直接使用OS線程API時所要求的一樣)。

    二.線程基本操作

    1.創(chuàng)建一個線程

    一個進(jìn)程的主線程是由操作系統(tǒng)自動生成,如果你要讓一個主線程創(chuàng)建額外的線程,可以通過ACE_Thread::spawn()實現(xiàn),該函數(shù)一般的使用方式如下:

    ACE_thread_t threadId; ACE_hthread_t threadHandle; ACE_Thread::spawn( (ACE_THR_FUNC)worker, //線程執(zhí)行函數(shù) NULL, //執(zhí)行函數(shù)參數(shù) THR_JOINABLE | THR_NEW_LWP, &threadId, &threadHandle );

    ACE_Thread::spawn((ACE_THR_FUNC)worker) 使用其默認(rèn)參數(shù),來創(chuàng)建一個worker的線程。

    ACE_Thread::spawn_n函數(shù)來創(chuàng)建多個線程。

    2.終止線程

    在線程函數(shù)體中ACE_Thread::exit()調(diào)用即可終止線程執(zhí)行。

    3.設(shè)定線程的相對優(yōu)先級

    當(dāng)一個線程被首次創(chuàng)建時,它的優(yōu)先級等同于它所屬進(jìn)程的優(yōu)先級。一個線程的優(yōu)先級是相對于其所屬的進(jìn)程的優(yōu)先級而言的。可以通過調(diào)用ACE_Thread::setprio函數(shù)改變線程的相對優(yōu)先級,該函數(shù)的調(diào)用方式如下:
    ACE_Thread::setprio(threadHandle,ACE_DEFAULT_THREAD_PRIORITY)

    4.掛起及恢復(fù)線程

    掛起線程可以通過來實現(xiàn),它能暫停一個線程的執(zhí)行,其調(diào)用方式如下ACE_Thread::suspend(threadHandle) 。
    相應(yīng)的,可以通過ACE_Thread::resume(threadHandle) 恢復(fù)被掛起的線程的執(zhí)行。

    5.等待線程結(jié)束

    在主函數(shù)中調(diào)用ACE_Thread::join(threadHandle)可阻塞主函數(shù),直道線程結(jié)束才能繼續(xù)執(zhí)行。

    6.停止線程

    在主函數(shù)中調(diào)用ACE_Thread::cancel (threadHandle)可停止線程的執(zhí)行(在Unix底下可以,而在windows下好像不起作用,有待檢驗)。

    三.程序示例

    下面例子演示了如何用ace創(chuàng)建一個線程。

    #include "ace/Thread.h" #include "ace/Synch.h"#pragma comment(lib, "ACEd.lib")#include <iostream> using namespace std; void* worker(void *arg) { for(int i=0;i<10;i++){ACE_OS::sleep(1);cout<<endl<<"hello world"<<endl;} return NULL; } int main(int argc, char *argv[]) { ACE_thread_t threadId;ACE_hthread_t threadHandle;ACE_Thread::spawn((ACE_THR_FUNC)worker, //線程執(zhí)行函數(shù)NULL, //執(zhí)行函數(shù)參數(shù)THR_JOINABLE | THR_NEW_LWP,&threadId,&threadHandle);ACE_Thread::join(threadHandle); return 0; }

    在這個簡單的例子中,創(chuàng)建了1個工作者線程,執(zhí)行程序中定義的worker()函數(shù)。然后阻塞主函數(shù),待線程結(jié)束后退出程序。

    ACE Lock類屬

    鎖類屬包含的類包裝簡單的鎖定機(jī)制,比如互斥體、信號量、讀/寫互斥體和令牌等。這里我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進(jìn)行一些簡單的說明。

    1.互斥體 ACE_Thread_Mutex

    互斥體用于保護(hù)共享的易變代碼,也就是全局或靜態(tài)數(shù)據(jù)。這樣的數(shù)據(jù)必須通過互斥體進(jìn)行保護(hù),以防止它們在多個線程同時訪問時損壞。??

    #include "ace/Thread.h" #include "ace/Synch.h" #include <iostream> using namespace std; ACE_Thread_Mutex mutex; void* Thread1(void *arg) {mutex.acquire();ACE_OS::sleep(3);cout<<endl<<"hello thread1"<<endl;mutex.release(); return NULL; } void* Thread2(void *arg) {mutex.acquire();cout<<endl<<"hello thread2"<<endl;mutex.release(); return NULL; } int main(int argc, char *argv[]) { ACE_Thread::spawn((ACE_THR_FUNC)Thread1); //Thread2 比Thread1晚創(chuàng)建1秒鐘,故后嘗試獲取互斥體ACE_OS::sleep(1);ACE_Thread::spawn((ACE_THR_FUNC)Thread2); while(true)ACE_OS::sleep(10); return 0; }

    ACE_Thread_Mutex主要有兩個方法:

  • acquire():用來獲取互斥體,如果無法獲取,將阻塞至獲取到為止。
  • release():用來釋放互斥體,從而使自己或者其它線程能夠獲取互斥體。
  • 當(dāng)線程要訪問共享資源時,首先調(diào)用acquire()方法獲取互斥體,從而獲取對改互斥體所保護(hù)的共享資源的唯一訪問權(quán)限,訪問結(jié)束時調(diào)用釋放互斥體,使得其它線程能獲取共享資源的訪問權(quán)限。

    2.ACE Lock類屬簡介。

    ACE Lock類屬列表如下:

    ACE_Mutex

    封裝互斥機(jī)制(根據(jù)平臺,可以是mutex_t、pthread_mutex_t等等)的包裝類,用于提供簡單而有效的機(jī)制來使對共享資源的訪問序列化。它與二元信號量(binary semaphore)的功能相類似。可被用于線程和進(jìn)程間的互斥。

    ACE_Thread_Mutex

    可用于替換ACE_Mutex,專用于線程同步。

    ACE_Process_Mutex

    可用于替換ACE_Mutex,專用于進(jìn)程同步。

    ACE_NULL_Mutex

    提供了ACE_Mutex接口的"無為"(do-nothing)實現(xiàn),可在不需要同步時用作替換。

    ACE_RW_Mutex

    封裝讀者/作者鎖的包裝類。它們是分別為讀和寫進(jìn)行獲取的鎖,在沒有作者在寫的時候,多個讀者可以同時進(jìn)行讀取。

    ACE_RW_Thread_Mutex

    可用于替換ACE_RW_Mutex,專用于線程同步。

    ACE_RW_Process_Mutex

    可用于替換ACE_RW_Mutex,專用于進(jìn)程同步。

    ACE_Semaphore

    這些類實現(xiàn)計數(shù)信號量,在有固定數(shù)量的線程可以同時訪問一個資源時很有用。在OS不提供這種同步機(jī)制的情況下,可通過互斥體來進(jìn)行模擬。

    ACE_Thread_Semaphore

    應(yīng)被用于替換ACE_Semaphore,專用于線程同步。

    ACE_Process_Semaphore

    應(yīng)被用于替換ACE_Semaphore,專用于進(jìn)程同步。

    ACE_Token

    提供"遞歸互斥體"(recursive mutex),也就是,當(dāng)前持有某令牌的線程可以多次重新獲取它,而不會阻塞。而且,當(dāng)令牌被釋放時,它確保下一個正阻塞并等待此令牌的線程就是下一個被放行的線程。

    ACE_Null_Token

    令牌接口的"無為"(do-nothing)實現(xiàn),在你知道不會出現(xiàn)多個線程時使用。

    ACE_Lock

    定義鎖定接口的接口類。一個純虛類,如果使用的話,必須承受虛函數(shù)調(diào)用開銷。

    ACE_Lock_Adapter

    基于模板的適配器,允許將前面提到的任意一種鎖定機(jī)制適配到ACE_Lock接口。

    可以簡單的分為以下幾類:

  • 互斥鎖
    互斥鎖(通常稱為"互斥體"或"二元信號量")用于保護(hù)多線程控制并發(fā)訪問的共享資源的完整性。互斥體通過定義臨界區(qū)來序列化多線程控制的執(zhí)行,在臨界區(qū)中每一時刻只有一個線程在執(zhí)行它的代碼。互斥體簡單而高效(時間和空間)。
    ACE線程庫提供了Mutex式的類(是一組互斥體對象,擁有類似的接口),他是一種簡單而高效的類型是"非遞歸"互斥體。非遞歸互斥體不允許當(dāng)前擁有互斥體的線程在釋放它之前重新獲取它。否則,將會立即發(fā)生死鎖。遞歸互斥體在ACE Recursive_Thread_Mutex類中可移植地實現(xiàn)。
  • 讀者/作者鎖
    讀者/作者鎖與互斥體相類似。例如,獲取讀者/作者鎖的線程也必須釋放它。多個線程可同時獲取一個讀者/作者鎖用于讀,但只有一個線程可以獲取該鎖用于寫。當(dāng)互斥體保護(hù)的資源用于讀遠(yuǎn)比用于寫要頻繁時,讀者/作者互斥體有助于改善并發(fā)的執(zhí)行。
    ACE線程庫提供了一個叫作RW_Mutex的類,在C++封裝類中可移植地實現(xiàn)了讀者/作者鎖的語義。讀者/作者鎖將優(yōu)先選擇權(quán)給作者。因而,如果有多個讀者和一個作者在鎖上等待,作者將會首先獲取它。
  • 計數(shù)信號量
    在概念上,計數(shù)信號量是可以原子地增減的整數(shù)。如果線程試圖減少一個值為零的信號量的值,它就會阻塞,直到另一個線程增加該信號量的值。
    計數(shù)信號量用于追蹤共享程序狀態(tài)的變化。它們記錄某種特定事件的發(fā)生。因為信號量維護(hù)狀態(tài),它們允許線程根據(jù)該狀態(tài)來作決定,即使事件是發(fā)生在過去。
    信號量比互斥體效率要低,但是,它們要更為通用,因為它們無需被最初獲取它們的同一線程獲取和釋放。這使得它們能夠用于異步的執(zhí)行上下文中(比如信號處理器)。ACE線程庫提供一個叫作Semaphore的類來可移植地在C++包裝類中實現(xiàn)信號量語義。

    ACE Guard類屬

    與C一級的互斥體API相比較,Mutex包裝為同步多線程控制提供了一種優(yōu)雅的接口。但是,Mutex潛在地容易出錯,因為程序員有可能忘記調(diào)用release方法(當(dāng)然,C級的互斥體API更容易出錯)。這可能由于程序員的疏忽或是C++異常的發(fā)生而發(fā)生,然而,其導(dǎo)致及其嚴(yán)重的后果--死鎖。

    因此,為改善應(yīng)用的健壯性,ACE同步機(jī)制有效地利用C++類構(gòu)造器和析構(gòu)器的語義來確保Mutex鎖被自動獲取和釋放。

    ACE提供了一個稱為Guard、Write_Guard和Read_Guard的類族,確保在進(jìn)入和退出C++代碼塊時分別自動獲取和釋放鎖

    Guard類是最基本的守衛(wèi)機(jī)制,定義可以簡化如下(實際定義比這相對要復(fù)雜而完善一點):

    template <class LOCK>
    class Guard
    {
    public:
    ??? Guard (LOCK &l): lock_ (&l){ lock_.acquire (); }
    ??? ?Guard (void) {??? lock_.release (); }
    private:
    ??? LOCK lock_;
    }

    Guard類的對象定義一"塊"代碼,在其上鎖被自動獲取,并在退出塊時自動釋放,即使是程序拋異常也能保證自動解鎖。這種機(jī)制也能為Mutex、RW_Mutex和Semaphore同步封裝工作。

    對于讀寫鎖,由于加鎖接口不一樣,ace也提供了相應(yīng)的Read_Guard和Write_Guard類,Read_Guard和Write_Guard類有著與Guard類相同的接口。但是,它們的acquire方法分別對鎖進(jìn)行讀和寫。

    缺省地, Guard類構(gòu)造器將會阻塞程序,直到鎖被獲取。會有這樣的情況,程序必須使用非阻塞的acquire調(diào)用(例如,防止死鎖)。因此,可以傳給ACE Guard的構(gòu)造器第二個參數(shù)(請參看原始代碼,而不是我這里的簡化代碼),指示它使用鎖的try_acquire方法,而不是acquire。隨后調(diào)用者可以使用Guard的locked方法來原子地測試實際上鎖是否已被獲取。

    用Guard重寫上一節(jié)的Thread1方法如下(注釋了的部分是原有代碼):

    void* Thread1(void *arg)
    {
    ??? ACE_Guard<ACE_Thread_Mutex> guard(mutex);
    //mutex.acquire();
    ??? ACE_OS::sleep(3);
    ??? cout<<endl<<"hello thread1"<<endl;
    //mutex.release();
    return NULL;
    }

    相比較而言,使用Guard更加簡潔,并且會自動解鎖,免除了一部分后顧之憂。

    注意:

  • Guard只能幫你自動加解鎖,并不能解決死鎖問題,特別是對于那些非遞歸的互斥體來說使用Guard尤其要注意防止死鎖。
  • Guard是在Guard變量析構(gòu)時解鎖,如果在同一函數(shù)中兩次對同一互斥體變量使用Guard要注意其對象生命周期,否則容易造成死鎖。

    ?

    ACE Condition類屬

    ACE Condition類屬(條件變量)提供風(fēng)格與互斥體、讀者/作者鎖和計數(shù)信號量不同的鎖定機(jī)制。當(dāng)持有鎖的線程在臨界區(qū)執(zhí)行代碼時,這三種機(jī)制讓協(xié)作線程進(jìn)行等待。相反,條件變量通常被一個線程用于使自己等待,直到一個涉及共享數(shù)據(jù)的條件表達(dá)式到達(dá)特定的狀態(tài)。當(dāng)另外的協(xié)作線程指示共享數(shù)據(jù)的狀態(tài)已發(fā)生變化,調(diào)度器就喚醒一個在該條件變量上掛起的線程。于是新喚醒的線程重新對它的條件表達(dá)式進(jìn)行求值,如果共享數(shù)據(jù)已到達(dá)合適狀態(tài),就恢復(fù)處理。

    ACE線程庫提供一個叫作Condition的類來可移植地在C++包裝類中實現(xiàn)條件變量語義。定義方式如下:
    ACE_Thread_Mutex mutex;
    ACE_Condition<ACE_Thread_Mutex> cond(mutex);

    該對象有兩個常用方法。

  • signal()
    向使用該條件變量的其它線程發(fā)送滿足條件信號。
  • wait()
    查詢是否滿足條件,如果滿足,則繼續(xù)往下執(zhí)行;如果不滿足條件,主線程就等待在此條件變量上。條件變量隨即自動釋放互斥體,并使主線程進(jìn)入睡眠。
  • 條件變量總是與互斥體一起使用。這是一種可如下描述的一般模式:

    while( expression NOT TRUE ) wait on condition variable;

    條件變量不是用于互斥,往往用于線程間的協(xié)作,下面例子演示了通過條件變量實現(xiàn)線程協(xié)作。

    #include "ace/Thread.h" #include "ace/Synch.h" #include <iostream> using namespace std; ACE_Thread_Mutex mutex; ACE_Condition<ACE_Thread_Mutex> cond(mutex); void* worker(void *arg) { ACE_OS::sleep(2); //保證eater線程的cond.wait()在worker線程的cond.signal()先執(zhí)行 mutex.acquire(); ACE_OS::sleep(1); cout<<endl<<"produce"<<endl; cond.signal(); mutex.release(); return NULL; } void* eater(void *arg) { mutex.acquire(); cond.wait(); cout<<endl<<"eat"<<endl; mutex.release(); return NULL; } int main(int argc, char *argv[]) { ACE_Thread::spawn((ACE_THR_FUNC)worker); ACE_OS::sleep(1); ACE_Thread::spawn((ACE_THR_FUNC)eater); while(true) ACE_OS::sleep(10); return 0; }

    這個例子中,首先創(chuàng)建了一個生產(chǎn)者線程worker和一個消費(fèi)者線程eater,消費(fèi)者線程執(zhí)行比生產(chǎn)者快,兩個線程不加限制并發(fā)執(zhí)行會導(dǎo)致先消費(fèi),后生產(chǎn)的情況(只是加互斥鎖也不能很好的解決,以為無法保證生產(chǎn)者一定先獲得互斥體)。所以這里通過條件變量的通知方式保證線程的順序執(zhí)行:

  • 消費(fèi)者線程獲取互斥體,等待條件滿足(生產(chǎn)者生產(chǎn)了食品)。同時釋放互斥體,進(jìn)入休眠狀態(tài)。

  • 生產(chǎn)者獲取互斥體(雖然是消費(fèi)者先獲取的互斥體,但消費(fèi)者調(diào)用的wait函數(shù)會釋放消費(fèi)者的互斥體),生產(chǎn)商品后,通過條件變量發(fā)送信號(調(diào)用signal函數(shù))通知消費(fèi)者生產(chǎn)完成,結(jié)束生產(chǎn)過程,釋放互斥體。

  • 消費(fèi)者收到信號后,重新獲取互斥體,完成消費(fèi)過程。

  • 使用條件變量的注意事項:

  • 條件變量必須和互斥體一起使用,也就是說使用前必須加鎖(調(diào)用互斥體acquire函數(shù)),使用完后需釋放互斥體。

  • 條件變量中的wait()和signal()成對使用的話,必須保證wait()函數(shù)在signal()之前執(zhí)行,這樣才能保證wait()能收到條件滿足通知,不至于一直等待下去,形成死鎖(worker線程中的第一句話就是起的這個作用)。

    ?

    ACE Synchronization類

    這一類并發(fā)控制對象一般也叫做雜項并發(fā)類,這類對象一般用得不多,這里我只是對其作一些簡單的介紹。

    1.Atomic_Op類

    ACE_Atomic_Op類用于將同步透明地參數(shù)化進(jìn)基本的算術(shù)運(yùn)算中。

    ACE_Atomic_Op是一種模板類,鎖定機(jī)制和需要參數(shù)化的類型被作為參數(shù)傳入其中,重載所有算術(shù)操作符,并確保在操作前獲取鎖,在操作后釋放它。運(yùn)算本身被委托給通過模板傳入的的類。

    使用ACE_Atomic_Op進(jìn)行變量封裝時,對于那些用ACE_Atomic_Op封裝了的變量操作都變成了線程安全的,而并看不到顯式的加解鎖代碼,代碼變得更簡潔,優(yōu)雅。

    2.ACE中的柵欄(Barrier)

    一組線程可以使用柵欄來進(jìn)行共同的相互同步。組中的每個線程各自執(zhí)行,直到到達(dá)柵欄,就阻塞在那里。在所有相關(guān)線程到達(dá)柵欄后,它們就全部繼續(xù)它們的執(zhí)行。就是說,它們一個接一個地阻塞,等待其他的線程到達(dá)柵欄;一旦所有線程都到達(dá)了它們的執(zhí)行路徑中的"柵欄點",它們就一起重新啟動。

    在ACE中,柵欄在ACE_Barrier類中實現(xiàn)。在柵欄對象被實例化時,它將要等待的線程的數(shù)目會作為參數(shù)傳入。一旦到達(dá)執(zhí)行路徑中的"柵欄點",每個線程都在柵欄對象上發(fā)出wait()調(diào)用。它們在這里阻塞,直到其他線程到達(dá)它們各自的"柵欄點",然后再一起繼續(xù)執(zhí)行。當(dāng)柵欄從相關(guān)線程那里接收了適當(dāng)數(shù)目的wait()調(diào)用時,它就同時喚醒所有阻塞的線程。

    舉個簡單的例子,運(yùn)動員進(jìn)行賽跑比賽時,雖然他們到達(dá)終點有先后順序,但會等到所有的運(yùn)動員跑完比賽后才一起領(lǐng)獎。

    ?

    面向?qū)ο蟮木€程類ACE_Task

    ?

    我們在前一章中使用ACE_Thread包裝時,你一定已經(jīng)注意到了一些不夠"優(yōu)雅"的地方。那一章中的大多數(shù)程序都被分解為函數(shù)、而不是對象。這是因為ACE_Thread包裝需要一個全局函數(shù)名、或是靜態(tài)方法作為參數(shù)。隨后該函數(shù)(靜態(tài)方法)就被用作所派生的線程的"啟動點"。這自然就使得程序員要為每個線程寫一個函數(shù)。如我們已經(jīng)看到的,這可能會導(dǎo)致非面向?qū)ο蟮某绦蚍纸狻?

    ACE_Task對常用線程處理進(jìn)行了OO包裝,通過ACE_Task,能對線程進(jìn)行更好的操作。

    ACE_Task是ACE中的任務(wù)或主動對象“處理結(jié)構(gòu)”的基類。ACE使用此類來實現(xiàn)主動對象模式。所有希望成為“主動對象”的對象都必須由此類派生。同時可將它看作是更高級的、更為面向?qū)ο蟮木€程。

    每個任務(wù)都含有一或多個線程,以及一個底層消息隊列。各個任務(wù)通過消息隊列進(jìn)行通信。至于消息隊列實現(xiàn)的內(nèi)在細(xì)節(jié)程序員不必關(guān)注。發(fā)送任務(wù)用putq() 將消息插入到另一任務(wù)的消息隊列中,接收任務(wù)通過使用getq()將消息提取出來。這樣的體系結(jié)構(gòu)大大簡化了多線程程序的編程模型。

    其主要成員如下:

    open():初始化資源

    close():釋放資源

    activate():啟動線程,可指定線程的數(shù)目

    svc():線程的啟動位置

    ????? putq():放置消息到任務(wù)的消息隊列中

    ????? getq():從任務(wù)的消息隊列中取出消息

    ????? thr_count():返回任務(wù)中線程的數(shù)目

    ????? last_thread():返回任務(wù)中將線程計數(shù)器從1降為0的線程的ID

    ?

    要創(chuàng)建任務(wù),需要進(jìn)行以下步驟:

  • 實現(xiàn)服務(wù)初始化和終止方法:
    open()方法應(yīng)該包含所有專屬于任務(wù)的初始化代碼。其中可能包括諸如連接控制塊、鎖和內(nèi)存這樣的資源。close()方法是相應(yīng)的終止方法。
  • 調(diào)用啟用(Activation)方法:
    在主動對象實例化后,你必須通過調(diào)用activate()啟用它。要在主動對象中創(chuàng)建的線程的數(shù)目,以及其他一些參數(shù),被傳遞給activate()方法。activate()方法會使svc()方法成為所有它生成的線程的啟動點。
  • 實現(xiàn)服務(wù)專有的處理方法:
    如上面所提到的,在主動對象被啟用后,各個新線程在svc()方法中啟動。應(yīng)用開發(fā)者必須在子類中定義此方法。
  • 下面的例子演示怎樣去創(chuàng)建任務(wù):

    #include "ace/Task.h" #include "ace/OS.h" #include <iostream> using namespace std; class TaskThread: public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc(void) { for(int i=0;i<10;i++) { ACE_OS::sleep(1); cout<<endl<<"hello thread1"<<endl; } return 0; } }; int main(int argc, char *argv[]) { TaskThread task; task.activate(); while(true) ACE_OS::sleep(10); return 0; }

    ACE_Task也封裝了常用線程操作,如暫停,恢復(fù)及停止等,是不是非常簡單和方便呢。

    其實ACE_Task的使用還不僅僅是這些,通過它還可實現(xiàn)一種很常用的網(wǎng)絡(luò)編程模式--主動對象模式,其具體功能在后續(xù)的設(shè)計模式部分將作詳細(xì)的介紹。

    ACE_Message_Queue

    在Windows和Linux的config文件中都沒有定義"ACE_HAS_TIMED_MESSAGE_BLOCKS"這個宏,所以msg_deadline_time和msg_execution_time都不起任何作用.

    ACE_Message_Queue_Factory這個工廠提供三個靜態(tài)函數(shù)分別用來創(chuàng)建靜態(tài)消息隊列和兩種類型的動態(tài)消息隊列。靜態(tài)消息隊列的消息也支持優(yōu)先級,但是消息的優(yōu)先級是靜態(tài)的,不需要通過動態(tài)計算而來。水位用來控制消息隊列中數(shù)據(jù)的大小,高水位(high_water_mark)用于控制消息隊列的上限,它用于控制生產(chǎn)者往里面放數(shù)據(jù)的量,如果消息隊列中數(shù)據(jù)量已經(jīng)達(dá)到高水位,而用使用了鎖,既使“ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();”創(chuàng)建消息隊列,那么生產(chǎn)者將被阻塞。高水位很容易理解,但是低水位是用來做什么的呢?

    只要消息隊列中還有數(shù)據(jù)消費(fèi)者就不會被阻塞的,而當(dāng)數(shù)據(jù)量超過高水位時,生產(chǎn)者會被阻塞,既然會被阻塞,那么它肯定需要被喚醒,那么什么時候由誰來喚醒生產(chǎn)者呢?這就是低水位的作用,消費(fèi)者一直消費(fèi)數(shù)據(jù),當(dāng)數(shù)據(jù)低于低水位時它就喚醒生產(chǎn)者。

    下面的代碼很好的展示了靜態(tài)消息隊列的使用。

    #include "ace/Message_Queue.h"
    #include "ace/Get_Opt.h"
    #include "ace/OS.h"
    #include <ace/Thread_Manager.h>
    #include <ace/Synch.h>

    //消息隊列指針
    ACE_Message_Queue<ACE_MT_SYNCH>* mq;

    const char S1[] = "C++";
    const char S2[] = "Java";
    const char S3[] = "PHP";
    const char S4[] = "C#";

    //四個消息指針
    ACE_Message_Block* mb1, * mb2, * mb3, * mb4;

    //生產(chǎn)者
    static void* produce(void *arg)
    {
    static int loop = 1;

    while(true)
    {
    ACE_OS::sleep(2);

    ACE_DEBUG((LM_DEBUG, "(%P : %t) producer...\n"));

    while(true)
    {
    if(loop == 1)
    {
    //將高水位設(shè)置為10, S1+S2的長度為3+4+2=9<10,因此可以將S3放進(jìn)去
    //但是再放入S4時生產(chǎn)者將會被阻塞
    //需要注意的是水位的大小并不是消息的個數(shù),而是消息隊列中消息里面的數(shù)據(jù)量之和
    //如果也能以消息的個數(shù)作為高低水位的值就好了
    mq->high_water_mark(10);

    mq->enqueue_prio (mb1);
    mq->enqueue_prio (mb2);
    mq->enqueue_prio (mb3);

    ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending!!\n"));

    //因為消費(fèi)者在睡眠6秒之后才會調(diào)用deactivate,因此生產(chǎn)者會在這兒阻塞幾秒鐘
    //可以不斷地將msg_bytes打印出來觀察觀察
    int ret = mq->enqueue_prio (mb4);

    ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by deactivate, ret = %d!!\n", ret));

    ++loop;
    }

    if(loop == 2)
    {
    ACE_OS::sleep(6);

    //將低水位設(shè)置為5,因為高水位仍然為10,當(dāng)前的數(shù)據(jù)量又超過了10,
    //所以下面的入隊操作仍會將生產(chǎn)者阻塞
    //這樣消費(fèi)者消費(fèi)消息,當(dāng)數(shù)據(jù)量小于5時,將喚醒生產(chǎn)者
    //生產(chǎn)者在此處等待被消費(fèi)者喚醒
    mq->low_water_mark(5);

    ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending again!!\n"));

    mq->enqueue_prio (mb4);

    ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by consumer!!\n"));

    ++loop;
    }
    }
    }

    return NULL;
    }

    //消費(fèi)者
    void* consume(void *arg)
    {
    static int loop = 1;

    while(true)
    {
    ACE_OS::sleep(2);

    ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer...\n"));

    if(loop == 1)
    {
    //等待6秒,此時生產(chǎn)者和消費(fèi)者都將被阻塞
    ACE_OS::sleep(6);

    //deactivate會喚醒所有的線程,將消息隊列設(shè)置為不可用
    //以后所存取操作都會返回-1
    //這個操作會喚醒生產(chǎn)者
    mq->deactivate();

    ++loop;
    }

    if(loop == 2)
    {
    ACE_OS::sleep(2);

    //將消息隊列的狀態(tài)設(shè)置成ACTIVATED
    //消息又可以使用了
    mq->activate();

    ++loop;
    }

    if(loop == 3)
    {
    ACE_OS::sleep(10);

    //消費(fèi)兩個消息之后,數(shù)據(jù)量就小于5了,低于低水位將喚醒生產(chǎn)者
    ACE_Message_Block *mb;
    mq->dequeue_head (mb);
    mq->dequeue_head (mb);

    ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer wake up producer!!\n"));

    ++loop;
    }
    }

    return NULL;
    }

    int main(int argc, char* argv[])
    {
    mq = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();

    int priority;

    //使用隨機(jī)數(shù)作為消息的優(yōu)先級
    //數(shù)字越高,優(yōu)先級越高
    priority = ACE_OS::rand() % 100;
    mb1 = new ACE_Message_Block(S1, sizeof S1, priority);

    priority = ACE_OS::rand() % 100;
    mb2 = new ACE_Message_Block(S2, sizeof S2, priority);

    priority = ACE_OS::rand() % 100;
    mb3 = new ACE_Message_Block(S3, sizeof S3, priority);

    priority = ACE_OS::rand() % 100;
    mb4 = new ACE_Message_Block(S4, sizeof S4, priority);

    //將消息壓入隊列中,enqueue_prio根據(jù)消息的優(yōu)先級將消息放到適當(dāng)?shù)奈恢蒙?/span>
    //enqueue_head只是簡單地將數(shù)據(jù)存入隊列中,而不考慮消息的優(yōu)先級
    //使用enqueue_prio壓入消息后,可以簡單通過dequeue_head和dequeue_tail
    //分別按優(yōu)先級從高到低和從低到高取消息
    //如果使用enqueue_head和enqueue_tail壓入消息
    //則需要通過dequeue_prio來按照消息的優(yōu)先級依次將消息出隊列
    //沒有必要既使用enqueue_prio壓入消息,又實用dequeue_prio來取消息
    mq->enqueue_prio (mb1);
    mq->enqueue_prio (mb2);
    mq->enqueue_prio (mb3);
    mq->enqueue_prio (mb4);

    //輸出靜態(tài)消息隊列的相關(guān)信息
    //高低水位默認(rèn)值均為16384
    ACE_DEBUG((LM_DEBUG, "count : %d, bytes : %d, length : %d, high_water_mark : %d, low_water_mark : %d, status : %d\n",
    mq->message_count(), mq->message_bytes(), mq->message_length(),
    mq->high_water_mark(), mq->low_water_mark(),
    mq->state()));

    ACE_Message_Block *mb;

    //使用next遍歷消息,遍歷的順序為高優(yōu)先級到底優(yōu)先級
    ACE_DEBUG((LM_DEBUG, "===========next=============\n"));
    //peek一下,并不彈出消息,類似Windows的PeekMessage
    mq->peek_dequeue_head(mb);
    do
    {
    ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
    }while(mb = mb->next());

    //使用迭代器遍歷消息隊列,遍歷的順序為高優(yōu)先級到底優(yōu)先級
    ACE_DEBUG((LM_DEBUG, "=========iterator=============\n"));
    ACE_Message_Queue<ACE_MT_SYNCH>::ITERATOR iterator (*mq);

    for (ACE_Message_Block *entry = 0;
    iterator.next (entry) != 0;
    iterator.advance ())
    {
    ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", entry->rd_ptr(), entry->msg_priority()));
    }


    ACE_DEBUG((LM_DEBUG, "============dequeue_head==========\n"));
    while(mq->dequeue_head (mb) != -1)
    {
    ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));

    //這里如果不判斷的話,消息隊列空時會導(dǎo)致主線程被阻塞
    if(mq->is_empty())
    break;
    }

    ACE_DEBUG((LM_DEBUG, "\n\n"));

    //測試高低水位和隊列的state使用,進(jìn)行測試之前mq隊列已空///
    //產(chǎn)生一個生產(chǎn)者線程
    ACE_Thread_Manager::instance()->spawn_n
    (
    1,
    (ACE_THR_FUNC) produce
    );

    產(chǎn)生兩個消費(fèi)者線程
    ACE_Thread_Manager::instance()->spawn_n
    (
    1,
    (ACE_THR_FUNC) consume
    );

    //掛起主線程
    ACE_Thread_Manager::instance()->wait();

    return 0;
    }


    ?

    ?

    ACE中TCP通信

    Tcp通信過程一般為如下步驟:

  • 服務(wù)器綁定端口,等待客戶端連接。

  • 客戶端通過服務(wù)器的ip和服務(wù)器綁定的端口連接服務(wù)器。

  • 服務(wù)器和客戶端通過網(wǎng)絡(luò)建立一條數(shù)據(jù)通路,通過這條數(shù)據(jù)通路進(jìn)行數(shù)據(jù)交互。

  • 常用API:

    1. ACE_INET_Addr類。

    ACE"地址"類ACE_Addr的子類,表示TCP/IP和UDP/IP的地址。它通常包含機(jī)器的ip和端口信息,通過它可以定位到所通信的進(jìn)程。

    定義方式:
    ACE_INET_Addr addInfo(3000,"192.168.1.100");
    常用方法:

  • get_host_name??? 獲取主機(jī)名

  • get_ip_address??? 獲取ip地址

  • get_port_number??? 獲取端口號

  • 2. ACE_SOCK_Acceptor類。

    服務(wù)期端使用,用于綁定端口和被動地接受連接。
    常用方法:

  • open 綁定端口

  • accept建立和客戶段的連接

  • 3. ACE_SOCK_Connector類。

    客戶端使用,用于主動的建立和服務(wù)器的連接。
    常用方法:

  • connect()??? 建立和服務(wù)期的連接。

  • 4. ACE_SOCK_Stream類。

    客戶端和服務(wù)器都使用,表示客戶段和服務(wù)器之間的數(shù)據(jù)通路。
    常用方法:

  • send ()??? 發(fā)送數(shù)據(jù)

  • recv ()??? 接收數(shù)據(jù)

  • close()??? 關(guān)閉連接(實際上就是斷開了socket連接)。

  • 代碼示例:

    下面例子演示了如何如何用ACE創(chuàng)建TCP通信的Server端。

    #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Stream.h" #include "ace/INET_Addr.h" #include "ace/OS.h"#include <string> #include <iostream> using namespace std;int main(int argc, char *argv[]) {ACE_INET_Addr port_to_listen(3000); //綁定的端口ACE_SOCK_Acceptor acceptor;if (acceptor.open (port_to_listen, 1) == -1) //綁定端口{cout<<endl<<"bind port fail"<<endl;return -1;}while(true){ACE_SOCK_Stream peer; //和客戶端的數(shù)據(jù)通路ACE_Time_Value timeout (10, 0);if (acceptor.accept (peer) != -1) //建立和客戶端的連接{cout<<endl<<endl<<"client connect. "<<endl;char buffer[1024];ssize_t bytes_received;ACE_INET_Addr raddr;peer.get_local_addr(raddr);cout<<endl<<"local port\t"<<raddr.get_host_name()<<"\t"<<raddr.get_port_number()<<endl;while ((bytes_received =peer.recv (buffer, sizeof(buffer))) != -1) //讀取客戶端發(fā)送的數(shù)據(jù){peer.send(buffer, bytes_received); //對客戶端發(fā)數(shù)據(jù)}peer.close ();}}return 0; }

    這個例子實現(xiàn)的功能很簡單,服務(wù)器端綁定3000號端口,等待一個客戶端的連接,然后將從客戶端讀取的數(shù)據(jù)再次轉(zhuǎn)發(fā)給客戶端,也就是實現(xiàn)了一個EchoServer的功能。

    相應(yīng)的客戶端程序也比較簡單,代碼如下:

    #include <ace/SOCK_Stream.h> #include <ace/SOCK_Connector.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std;int main(int argc, char *argv[]) {ACE_INET_Addr addr(3000,"127.0.0.1");ACE_SOCK_Connector connector; ACE_Time_Value timeout(5,0);ACE_SOCK_Stream peer;if(connector.connect(peer,addr,&timeout) != 0){cout<<"connection failed !"<<endl;return 1;}cout<<"conneced !"<<endl;string s="hello world";peer.send(s.c_str(),s.length()); //發(fā)送數(shù)據(jù)cout<<endl<<"send:\t"<<s<<endl;ssize_t bc=0; //接收的字節(jié)數(shù)char buf[1024];bc=peer.recv(buf,1024,&timeout); //接收數(shù)據(jù)if(bc>=0){buf[bc]='\0';cout<<endl<<"rev:\t"<<buf<<endl;}peer.close();return 0; }

    ?

    ACE中UDP通信

    udp是一種無連接的協(xié)議,提供無連接不可靠的服務(wù)。

    在ace中,通過ACE_SOCK_Dgram類提供udp通信服務(wù),ACE_SOCK_Dgram和ACE_SOCK_Stream的API非常類似,一樣提供了send,recv及close等常用操作,這里就不再累述了。

    udp通信時無需像tcp那樣建立連接和關(guān)閉連接,tcp編程時需要通過accept和connect來建立連接,而udp通信省略了這一步驟,相對來說編程更為簡單。

    由于udp通信時無建立連接,服務(wù)器端不能像Tcp通信那樣在建立連接的時候就獲得客戶端的地址信息,故服務(wù)器端不能主動對客戶端發(fā)送信息(不知道客戶端的地址),只有等到收到客戶端發(fā)送的udp信息時才能確定客戶端的地址信息,從而進(jìn)行通信。

    udp通信過程如下:

  • 服務(wù)器端綁定一固定udp端口,等待接收客戶端的通信。
  • 客戶端通過服務(wù)器的ip和地址信息直接對服務(wù)器端發(fā)送消息。
  • 服務(wù)器端收到客戶端發(fā)送的消息后獲取客戶端的ip和端口信息,通過該地址信息和客戶端通信。
  • 下面代碼為EchoServer的udp版:

    //server.cpp #include <ace/SOCK_Dgram.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std; int main(int argc, char *argv[]) { ACE_INET_Addr port_to_listen(3000); //綁定的端口 ACE_SOCK_Dgram peer(port_to_listen); //通信通道 char buf[100]; while(true) { ACE_INET_Addr remoteAddr; //所連接的遠(yuǎn)程地址 int bc = peer.recv(buf,100,remoteAddr); //接收消息,獲取遠(yuǎn)程地址信息 if( bc != -1) { string s(buf,bc); cout<<endl<<"rev:\t"<<s<<endl; } peer.send(buf,bc,remoteAddr); //和遠(yuǎn)程地址通信 } return 0; }

    相應(yīng)的客戶端程序如下:

    //client.cpp #include <ace/SOCK_Dgram.h> #include <ace/INET_Addr.h> #include <ace/Time_Value.h> #include <string> #include <iostream> using namespace std; int main(int argc, char *argv[]) { ACE_INET_Addr remoteAddr(3000,"127.0.0.1"); //所連接的遠(yuǎn)程地址 ACE_INET_Addr localAddr; //本地地址信息 ACE_SOCK_Dgram peer(localAddr); //通信通道 peer.send("hello",5,remoteAddr); //發(fā)送消息 char buf[100]; int bc = peer.recv(buf,100,remoteAddr); //接收消息 if( bc != -1) { string s(buf,bc); cout<<endl<<"rev:\t"<<s<<endl; } return 0; }

    和tcp編程相比,udp無需通過acceptor,connector來建立連接,故代碼相對tcp編程來說要簡單許多。另外,由于udp是一種無連接的通信方式,ACE_SOCK_Dgram的實例對象中無法保存遠(yuǎn)端地址信息(保存了本地地址信息),故通信的時候需要加上遠(yuǎn)端地址信息。

    ?

    ACE主動對象模式

    主動對象模式用于降低方法執(zhí)行和方法調(diào)用之間的耦合。該模式描述了另外一種更為透明的任務(wù)間通信方法。

    傳統(tǒng)上,所有的對象都是被動的代碼段,對象中的代碼是在對它發(fā)出方法調(diào)用的線程中執(zhí)行的,當(dāng)方法被調(diào)用時,調(diào)用線程將阻塞,直至調(diào)用結(jié)束。而主動對象卻不一樣。這些對象具有自己的命令執(zhí)行線程,主動對象的方法將在自己的執(zhí)行線程中執(zhí)行,不會阻塞調(diào)用方法。

    例如,設(shè)想對象"A"已在你的程序的main()函數(shù)中被實例化。當(dāng)你的程序啟動時,OS創(chuàng)建一個線程,以從main()函數(shù)開始執(zhí)行。如果你調(diào)用對象A的任何方法,該線程將"流過"那個方法,并執(zhí)行其中的代碼。一旦執(zhí)行完成,該線程返回調(diào)用該方法的點并繼續(xù)它的執(zhí)行。但是,如果"A"是主動對象,事情就不是這樣了。在這種情況下,主線程不會被主動對象借用。相反,當(dāng)"A"的方法被調(diào)用時,方法的執(zhí)行發(fā)生在主動對象持有的線程中。另一種思考方法:如果調(diào)用的是被動對象的方法(常規(guī)對象),調(diào)用會阻塞(同步的);而另一方面,如果調(diào)用的是主動對象的方法,調(diào)用不會阻塞(異步的)。

    由于主動對象的方法調(diào)用不會阻塞,這樣就提高了系統(tǒng)響應(yīng)速度,在網(wǎng)絡(luò)編程中是大有用武之地的。

    在這里我們將一個"Logger"(日志記錄器)對象為例來介紹如何將一個傳統(tǒng)對象改造為主動對象,從而提高系統(tǒng)響應(yīng)速度。

    Logger的功能是將一些系統(tǒng)事件的記錄在存儲器上以備查詢,由于Logger使用慢速的I/O系統(tǒng)來記錄發(fā)送給它的消息,因此對Logger的操作將會導(dǎo)致系統(tǒng)長時間的等待。

    其功能代碼簡化如下:

    class Logger: public ACE_Task<ACE_MT_SYNCH>
    {
    public:
    void LogMsg(const string& msg)
    ??? {
    ??????? cout<<endl<<msg<<endl;
    ??????? ACE_OS::sleep(2);
    ??? }
    };

    為了實現(xiàn)記錄日志操作的主動執(zhí)行,我們需要用命令模式將其封裝,從而使得記錄日志的方法能在合適的時間和地方主動執(zhí)行,封裝方式如下:

    class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg){this->log=plog;this->msg=msg;}int call(){this->log->LogMsg(msg);return 0;}private:Logger *log;string msg; };class Logger: public ACE_Task<ACE_MT_SYNCH> { public:void LogMsg(const string& msg){cout<<endl<<msg<<endl;ACE_OS::sleep(2);}LogMsgCmd *LogMsgActive(const string& msg){new LogMsgCmd(this,msg);} };

    ?

    ?

    這里對代碼功能做一下簡單的說明:

    ACE_Method_Object是ACE提供的命令模式借口,命令接口調(diào)用函數(shù)為int call(),在這里通過它可以把每個操作日志的調(diào)用封裝為一個LogMsgCmd對象,這樣,當(dāng)原來需要調(diào)用LogMsg的方法的地方只要調(diào)用LogMsgActive即可生成一個LogMsgCmd對象,由于調(diào)用LogMsgActive方法,只是對命令進(jìn)行了封裝,并沒有進(jìn)行日志操作,所以該方法會立即返回。然后再新開一個線程,將LogMsgCmd對象作為參數(shù)傳入,在該線程中執(zhí)行LogMsgCmd對象的call方法,從而實現(xiàn)無阻塞調(diào)用。

    然而,每次對一個LogMsg調(diào)用都開啟一個新線程,無疑是對資源的一種浪費(fèi),實際上我們往往將生成的LogMsgCmd對象插入一個命令隊列中,只新開一個命令執(zhí)行線程依次執(zhí)行命令隊列中的所有命令。并且,為了實現(xiàn)對象的封裝,命令隊列和命令執(zhí)行線程往往也封裝到Logger對象中,代碼如下所示:

    #include "ace/OS.h" #include "ace/Task.h" #include "ace/Method_Object.h" #include "ace/Activation_Queue.h" #include "ace/Auto_Ptr.h"#include <string> #include <iostream> using namespace std;class Logger: public ACE_Task<ACE_MT_SYNCH> { public:Logger(){this->activate();}int svc();void LogMsg(const string& msg);void LogMsgActive (const string& msg);private:ACE_Activation_Queue cmdQueue; //命令隊列 };class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg){this->log=plog;this->msg=msg;}int call(){this->log->LogMsg(msg);return 0;}private:Logger *log;string msg; };void Logger::LogMsg(const string& msg) {cout<<endl<<msg<<endl;ACE_OS::sleep(2); }//以主動的方式記錄日志 void Logger::LogMsgActive(const string& msg) {//生成命令對象,插入到命令隊列中cmdQueue.enqueue(new LogMsgCmd(this,msg)); }int Logger::svc() {while(true){//遍歷命令隊列,執(zhí)行命令auto_ptr<ACE_Method_Object> mo(this->cmdQueue.dequeue ());if (mo->call () == -1)break;}return 0; }int main (int argc, ACE_TCHAR *argv[]) {Logger log;log. LogMsgActive ("hello");ACE_OS::sleep(1);log.LogMsgActive("abcd");while(true)ACE_OS::sleep(1);return 0; }

    在這里需要注意一下命令隊列ACE_Activation_Queue對象,它是線程安全的,使用方法比較簡單,這里我也不多介紹了。

    主動對象的基本結(jié)構(gòu)就是這樣,然而,由于主動對象是異步調(diào)用的,又引出了如下兩個新問題:

  • 方法調(diào)用線程如何知道該方法已經(jīng)執(zhí)行完成?
  • 如何或得方法的返回值?
  • 要解決這兩個問題,首先得介紹一下ACE_Future對象,ACE_Future是表示一個會在將來被賦值的"期貨"對象,可以通過ready()函數(shù)查詢它是否已經(jīng)被賦值。該對象創(chuàng)建的時候是未賦值的,后期可以通過set()函數(shù)來進(jìn)行賦值,所賦的值可以通過get()函數(shù)來獲取。
  • 下面代碼演示了它的基本用法: #include "ace/Future.h"#include <string> #include <iostream> using namespace std;void get_info(ACE_Future<string> &fu) {string state = fu.ready()?"ready":"not ready";cout<<endl<<state<<endl;if(fu.ready()){string value;fu.get(value);cout<<"value:\t"<<value<<endl;} }int main(int argc, char *argv[]) {ACE_Future<string> fu;get_info(fu);fu.set("12345");get_info(fu);return 0; }
  • 通過ACE_Future對象來解決上述兩個問題的方法如下:

    • 首先創(chuàng)建ACE_Future對象用以保留返回值。
    • 調(diào)用主動命令時將ACE_Future對象作為參數(shù)傳入,生成的命令對象中保存ACE_Future對象的指針。
    • 命令執(zhí)行線程執(zhí)行完命令后,將返回值通過set()函數(shù)設(shè)置到ACE_Future對象中。
    • 調(diào)用線程可以通過ACE_Future對象的ready()函數(shù)查詢該命令是否執(zhí)行完成,如果命令執(zhí)行完成,則可通過get()函數(shù)來獲取返回值。

    使用的時候要注意一下ACE_Future對象的生命周期。

    為了演示了如何獲取主動命令的執(zhí)行狀態(tài)和結(jié)果,我將上篇文章中的代碼改動了一下,日志類記錄日志后,會將記錄的內(nèi)容作為返回值返回,該返回值會通過ACE_Future對象返回,代碼如下

    #include "ace/OS.h" #include "ace/Task.h" #include "ace/Method_Object.h" #include "ace/Activation_Queue.h" #include "ace/Auto_Ptr.h"#include "ace/Future.h"#include <string> #include <iostream> using namespace std;class Logger: public ACE_Task<ACE_MT_SYNCH> { public:Logger(){this->activate();}int svc();string LogMsg(const string& msg);void LogMsgActive (const string& msg,ACE_Future<string> *result);private:ACE_Activation_Queue cmdQueue; //命令隊列 };class LogMsgCmd: public ACE_Method_Object { public:LogMsgCmd(Logger *plog,const string& msg,ACE_Future<string> *result){this->log=plog;this->msg=msg;this->result=result;}int call(){string reply = this->log->LogMsg(msg);result->set(reply);return 0;}private:ACE_Future<string> *result;Logger *log;string msg; };string Logger::LogMsg(const string& msg) {ACE_OS::sleep(2);cout<<endl<<msg<<endl;return msg; }//以主動的方式記錄日志 void Logger::LogMsgActive(const string& msg,ACE_Future<string> *result) {//生成命令對象,插入到命令隊列中cmdQueue.enqueue(new LogMsgCmd(this,msg,result)); }int Logger::svc() {while(true){//遍歷命令隊列,執(zhí)行命令auto_ptr<ACE_Method_Object> mo(this->cmdQueue.dequeue ());if (mo->call () == -1)break;}return 0; }void get_info(ACE_Future<string> &fu) {string state = fu.ready()?"ready":"not ready";cout<<endl<<state<<endl;if(fu.ready()){string value;fu.get(value);cout<<"value:\t"<<value<<endl;} }int main (int argc, ACE_TCHAR *argv[]) {ACE_Future<string> result;Logger log;log.LogMsgActive ("hello",&result);while(true){get_info(result);if(result.ready())break;ACE_OS::sleep(1);}cout<<endl<<"cmd end"<<endl;while(true)ACE_OS::sleep(1);return 0; }

    這種查詢模式比較簡單有效,但存在一個問題:調(diào)用線程必須不斷輪詢ACE_Future對象以獲取返回值,這樣的效率比較低。可以通過觀察者模式解決這個問題:在ACE_Future對象上注冊一個觀察者,當(dāng)ACE_Future對象的值發(fā)生改變(異步命令執(zhí)行完成)時主動通知該觀察者,從而獲取返回值。

    ACE中的觀察者模式可以通過ACE_Future_Observer來實現(xiàn),使用方法如下:

    通過觀察者模式,可以更有效,及時的獲取異步命令的返回值,但同時也增加了程序結(jié)構(gòu)的復(fù)雜度并且難以調(diào)試,使用的時候應(yīng)該根據(jù)需要選取合適的方式。

    #include "ace/Future.h"#include <string> #include <iostream> using namespace std;class MyObserver:public ACE_Future_Observer<string> {virtual void update (const ACE_Future<string> &future){string value;future.get(value);cout<<endl<<"change:\t"<<value<<endl;} };int main(int argc, char *argv[]) {MyObserver obv;ACE_Future<string> fu;fu.attach(&obv);ACE_OS::sleep(3);fu.set("12345");while(true)ACE_OS::sleep(3);return 0; }

    ?

    ACE反應(yīng)器(Reactor)模式

    反應(yīng)器(Reactor):用于事件多路分離和分派的體系結(jié)構(gòu)模式

    通常的,對一個文件描述符指定的文件或設(shè)備, 有兩種工作方式: 阻塞與非阻塞。所謂阻塞方式的意思是指, 當(dāng)試圖對該文件描述符進(jìn)行讀寫時, 如果當(dāng)時沒有東西可讀,或者暫時不可寫, 程序就進(jìn)入等待狀態(tài), 直到有東西可讀或者可寫為止。而對于非阻塞狀態(tài), 如果沒有東西可讀, 或者不可寫, 讀寫函數(shù)馬上返回, 而不會等待。

    在前面的章節(jié)中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:當(dāng)接收tcp數(shù)據(jù)時,如果遠(yuǎn)端沒有數(shù)據(jù)可以讀,則會一直阻塞到讀到需要的數(shù)據(jù)為止。這種方式的傳輸和傳統(tǒng)的被動方法的調(diào)用類似,非常直觀,并且簡單有效,但是同樣也存在一個效率問題,如果你是開發(fā)一個面對著數(shù)千個連接的服務(wù)器程序,對每一個客戶端都采用阻塞的方式通信,如果存在某個非常耗時的讀寫操作時,其它的客戶端通信將無法響應(yīng),效率非常低下。

    一種常用做法是:每建立一個Socket連接時,同時創(chuàng)建一個新線程對該Socket進(jìn)行單獨(dú)通信(采用阻塞的方式通信)。這種方式具有很高的響應(yīng)速度,并且控制起來也很簡單,在連接數(shù)較少的時候非常有效,但是如果對每一個連接都產(chǎn)生一個線程的無疑是對系統(tǒng)資源的一種浪費(fèi),如果連接數(shù)較多將會出現(xiàn)資源不足的情況。

    另一種較高效的做法是:服務(wù)器端保存一個Socket連接列表,然后對這個列表進(jìn)行輪詢,如果發(fā)現(xiàn)某個Socket端口上有數(shù)據(jù)可讀時(讀就緒),則調(diào)用該socket連接的相應(yīng)讀操作;如果發(fā)現(xiàn)某個Socket端口上有數(shù)據(jù)可寫時(寫就緒),則調(diào)用該socket連接的相應(yīng)寫操作;如果某個端口的Socket連接已經(jīng)中斷,則調(diào)用相應(yīng)的析構(gòu)方法關(guān)閉該端口。這樣能充分利用服務(wù)器資源,效率得到了很大提高。

    在Socket編程中就可以通過select等相關(guān)API實現(xiàn)這一方式。但直接用這些API控制起來比較麻煩,并且也難以控制和移植,在ACE中可以通過Reactor模式簡化這一開發(fā)過程。

    反應(yīng)器本質(zhì)上提供一組更高級的編程抽象,簡化了事件驅(qū)動的分布式應(yīng)用的設(shè)計和實現(xiàn)。除此而外,反應(yīng)器還將若干不同種類的事件的多路分離集成到易于使用的API中。特別地,反應(yīng)器對基于定時器的事件、信號事件、基于I/O端口監(jiān)控的事件和用戶定義的通知進(jìn)行統(tǒng)一地處理。

    ACE中的反應(yīng)器與若干內(nèi)部和外部組件協(xié)同工作。其基本概念是反應(yīng)器框架檢測事件的發(fā)生(通過在OS事件多路分離接口上進(jìn)行偵聽),并發(fā)出對預(yù)登記事件處理器(event handler)對象中的方法的"回調(diào)"(callback)。該方法由應(yīng)用開發(fā)者實現(xiàn),其中含有應(yīng)用處理此事件的特定代碼。

    使用ACE的反應(yīng)器,只需如下幾步:

  • ?
  • 創(chuàng)建事件處理器,以處理他所感興趣的某事件。

  • 在反應(yīng)器上登記,通知說他有興趣處理某事件,同時傳遞他想要用以處理此事件的事件處理器的指針給反應(yīng)器。

  • 隨后反應(yīng)器框架將自動地:

  • 在內(nèi)部維護(hù)一些表,將不同的事件類型與事件處理器對象關(guān)聯(lián)起來。

  • 在用戶已登記的某個事件發(fā)生時,反應(yīng)器發(fā)出對處理器中相應(yīng)方法的回調(diào)。

  • 反應(yīng)器模式在ACE中被實現(xiàn)為ACE_Reactor類,它提供反應(yīng)器框架的功能接口。

    如上面所提到的,反應(yīng)器將事件處理器對象作為服務(wù)提供者使用。反應(yīng)器內(nèi)部記錄某個事件處理器的特定事件的相關(guān)回調(diào)方法。當(dāng)這些事件發(fā)生時,反應(yīng)器會創(chuàng)建這種事件和相應(yīng)的事件處理器的關(guān)聯(lián)。

  • 事件處理器
    事件處理器就是需要通過輪詢發(fā)生事件改變的對象列表中的對象,如在上面的例子中就是連接的客戶端,每個客戶端都可以看成一個事件處理器。
  • 回調(diào)事件
    就是反應(yīng)器支持的事件,如Socket讀就緒,寫就緒。拿上面的例子來說,如果某個客戶端(事件處理器)在反應(yīng)器中注冊了讀就緒事件,當(dāng)客戶端給服務(wù)器發(fā)送一條消息的時候,就會觸發(fā)這個客戶端的數(shù)據(jù)可讀的回調(diào)函數(shù)。
  • 在反應(yīng)器框架中,所有應(yīng)用特有的事件處理器都必須由ACE_Event_Handler的抽象接口類派生。可以通過重載相應(yīng)的"handle_"方法實現(xiàn)相關(guān)的回調(diào)方法。

    使用ACE_Reactor基本上有三個步驟:

  • 創(chuàng)建ACE_Event_Handler的子類,并在其中實現(xiàn)適當(dāng)?shù)?#34;handle_"方法,以處理你想要此事件處理器為之服務(wù)的事件類型。
  • 通過調(diào)用反應(yīng)器對象的register_handler(),將你的事件處理器登記到反應(yīng)器。
  • 在事件發(fā)生時,反應(yīng)器將自動回調(diào)相應(yīng)的事件處理器對象的適當(dāng)?shù)膆andle_"方法。
  • 下面我就以一個Socket客戶端的例子為例簡單的說明反應(yīng)器的基本用法。

    #include <ace/OS.h> #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <string> #include <iostream> using namespace std;class MyClient:public ACE_Event_Handler { public:bool open(){ACE_SOCK_Connector connector;ACE_INET_Addr addr(3000,"127.0.0.1");ACE_Time_Value timeout(5,0);if(connector.connect(peer,addr,&timeout) != 0){cout<<endl<<"connecetd fail";return false;}ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);cout<<endl<<"connecetd ";return true;}ACE_HANDLE get_handle(void) const{return peer.get_handle();}int handle_input (ACE_HANDLE fd){int rev=0;ACE_Time_Value timeout(5,0);if((rev=peer.recv(buffer,1000,&timeout))>0){buffer[rev]='\0';cout<<endl<<"rev:\t"<<buffer<<endl;}return 3;}private:ACE_SOCK_Stream peer;char buffer[1024]; };int main(int argc, char *argv[]) {MyClient client;client.open();while(true){ACE_Reactor::instance()->handle_events(); }return 0; }

    在這個例子中,客戶端連接上服務(wù)器后,通過ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注冊了一個讀就緒的回調(diào)函數(shù),當(dāng)服務(wù)器端給客戶端發(fā)消息的時候,會自動觸發(fā)handle_input()函數(shù),將接收到的信息打印出來。

    在Socket編程中,常見的事件就是"讀就緒","寫就緒",通過對這兩個事件的捕獲分發(fā),可以實現(xiàn)Socket中的異步操作。

    Socket編程中的事件處理器

    在前面我們已經(jīng)介紹過,在ACE反應(yīng)器框架中,任何都必須派生自ACE_Event_Handler類,并通過重載其相應(yīng)會調(diào)事件處理函數(shù)來實現(xiàn)相應(yīng)的回調(diào)處理的。在Socket編程中,我們通常需要重載的函數(shù)有

  • handle_input()
    當(dāng)I/O句柄(比如UNIX中的文件描述符)上的輸入可用時,反應(yīng)器自動回調(diào)該方法。
  • handle_output()
    當(dāng)I/O設(shè)備的輸出隊列有可用空間時,反應(yīng)器自動回調(diào)該方法。
  • handle_close()
    當(dāng)事件處理器中的事件從Reactor中移除的時候調(diào)用。
  • 此外,為了使Reactor能通過I/O句柄找到對應(yīng)的事件處理器,還必須重載其get_handle()方法以使得Reactor建立起I/O句柄和事件處理器的關(guān)聯(lián)。

    使用Reactor框架。

    下面我們將以一個客戶端的程序為例,介紹如何在Socket編程中使用Reactor框架。

    一.建立一個客戶端對象(事件處理器)。

    客戶端對象就是一個事件處理器,其聲明如下:

    class Client:public ACE_Event_Handler { public: ACE_HANDLE get_handle(void) const; int handle_input (ACE_HANDLE fd); int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); ACE_SOCK_Stream& Peer(); private: ACE_SOCK_Stream peer; };

    在Client端中我只關(guān)心"讀就緒"事件,故只重載了handle_input函數(shù)(大多數(shù)應(yīng)用下只需要重載handle_input函數(shù))。另外,在客戶端還保存了一個ACE_SOCK_Stream的peer對象用來進(jìn)行Socket通信,同時封裝了一個Peer()函數(shù)返回它的引用。

    二.重載相應(yīng)回調(diào)處理函數(shù)

    ACE_SOCK_Stream& Client::Peer() { return peer; } ACE_HANDLE Client::get_handle(void) const { return peer.get_handle(); } int Client::handle_input (ACE_HANDLE fd) { int rev=0; if((rev = peer.recv(buffer,1000))>0) { buffer[rev]='\0'; cout<<endl<<"rev:\t"<<buffer<<endl; return 0; } else //Socket連接發(fā)生錯誤,返回-1,在Reactor中注銷事件,觸發(fā)handle_close函數(shù) { return -1; } } int Client::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { cout<<endl<<"connecetd closed"; return ACE_Event_Handler::handle_close(handle,close_mask); }

    ?

    三.在Reactor中注冊事件

    首先讓我們來看看相應(yīng)的main函數(shù)的代碼:

    int main(int argc, char *argv[]) {Client client;ACE_SOCK_Connector connector;ACE_INET_Addr addr(3000,"127.0.0.1");ACE_Time_Value timeout(5,0);if(connector.connect(client.Peer(),addr,&timeout) != 0){cout<<endl<<"connecetd fail";return 0;}ACE_Reactor::instance()->register_handler(&client,ACE_Event_Handler::READ_MASK);while(true) { ACE_Reactor::instance()->handle_events(); }return 0; }

    在這里可以看到,使用Reactor框架后,依然首先通過ACE_SOCK_Connector的connect函數(shù)來建立連接。建立連接后,可以通過ACE_Reactor::instance()->register_handler函數(shù)來實現(xiàn)Reactor的注冊,實現(xiàn)I/O事件和Client對象的handle_input方法相關(guān)聯(lián),它的第一個參數(shù)是事件處理器的地址,第二個參數(shù)是事件類型,由于這里只關(guān)心讀就緒事件,故注冊的事件類型是ACE_Event_Handler::READ_MASK。

    四.啟動Reactor事件循環(huán)

    通過如上設(shè)置后,我們就可以通過ACE_Reactor::instance()->handle_events()啟動Reactor循環(huán)了,這樣,每當(dāng)服務(wù)器端有數(shù)據(jù)發(fā)送給客戶端時,當(dāng)客戶端的數(shù)據(jù)就緒時,就回觸發(fā)Client對象的handle_input函數(shù),將接收的數(shù)據(jù)打印出來。

    通常的做法是,將Reactor事件循環(huán)作為一個單獨(dú)的線程來處理,這樣就不會阻塞main函數(shù)。

    五.注銷Reactor事件

    Reactor事件的注銷一般有兩種方式,顯式和隱式,下面將分別給予介紹。

  • 隱式注銷。
    當(dāng)Reactor捕獲事件后,會觸發(fā)相應(yīng)的"handle_"處理函數(shù),當(dāng)"handle_"處理函數(shù)返回值大于或等于0時,表示處理事件成功,當(dāng)返回值小于0時,表示處理事件失敗,這時Reactor會自動注銷該句柄所注冊的所有事件,并觸發(fā)handle_close函數(shù),以執(zhí)行相應(yīng)的資源清理工作。
    在本例中,當(dāng)handle_input函數(shù)里的recv函數(shù)接收到0個數(shù)時,說明socket發(fā)生錯誤(大多為Socket連接中斷),此時返回-1,則系統(tǒng)自動注銷相應(yīng)事件。
  • 顯示注銷。
    調(diào)用Reactor對象的remove_handler方法移除,它有兩個參數(shù),第一個是所注冊的事件反應(yīng)器對象,第二個是所要注銷的事件。
  • 在這個示例程序里,連接方只有一個Socket連接,Reactor的優(yōu)勢并沒有體現(xiàn)出來,但在一些網(wǎng)絡(luò)管理系統(tǒng)里,連接方需要對多個需要管理的設(shè)備(服務(wù)器端)進(jìn)行連接,在這種情況下使用Reactor模式,只需要多開一個Reactor事件循環(huán)線程就能實現(xiàn)事件多路分發(fā)復(fù)用,并且不會阻塞,通過面向?qū)ο蟮幕卣{(diào)方式管理,使用起來非常方便。

    Reactor框架的另外一個常用的地方就是服務(wù)器端,一般是一個服務(wù)器端對應(yīng)多個客戶端,這樣用Reactor模式能大幅提高并發(fā)能力,這方面的編程方法將在下一章給與介紹。

    ?

    在服務(wù)器端使用Reactor框架

    使用Reactor框架的服務(wù)器端結(jié)構(gòu)如下:

    服務(wù)器端注冊兩種事件處理器,ClientAcceptor和ClientService ,ClientService類負(fù)責(zé)和客戶端的通信,每一個ClientService對象對應(yīng)一個客戶端的Socket連接。 ClientAcceptor專門負(fù)責(zé)被動接受客戶端的連接,并創(chuàng)建ClientService對象。這樣,在一個N個Socket連接的服務(wù)器程序中,將存在1個ClientAcceptor對象和N個ClientService對象。

    整個服務(wù)器端流程如下:

  • 首先創(chuàng)建一個ClientAcceptor對象,該對象在Reactor上注冊ACCEPT_MASK事件,Reactor將自動在監(jiān)聽端口建立Socket監(jiān)聽。

  • 如果有對該端口的Socket連接時,Reactor將自動回調(diào)handle_input方法,ClientAcceptor重載此方法,并創(chuàng)建一個ClientService對象,用于處理和Client的通信。

  • ClientService對象根據(jù)服務(wù)器的具體功能實現(xiàn),其處理過程和客戶端程序類似,注冊相應(yīng)的回調(diào)事件并分發(fā)即可。

  • 代碼如下:

    #include <ace/OS.h> #include <ace/Reactor.h> #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/Auto_Ptr.h> class ClientService : public ACE_Event_Handler { public: ACE_SOCK_Stream &peer (void) { return this->sock_; } int open (void) { //注冊讀就緒回調(diào)函數(shù) return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { //一個簡單的EchoServer,將客戶端的信息返回 int rev = peer().recv(buf,100); if(rev<=0) return -1; peer().send(buf,rev); return 0; } // 釋放相應(yīng)資源 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask) { if (mask == ACE_Event_Handler::WRITE_MASK) return 0; mask = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, mask); this->sock_.close (); delete this; //socket出錯時,將自動刪除該客戶端,釋放相應(yīng)資源 return 0; } protected: char buf[100]; ACE_SOCK_Stream sock_; }; class ClientAcceptor : public ACE_Event_Handler { public: virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);} int open (const ACE_INET_Addr &listen_addr) { if (this->acceptor_.open (listen_addr, 1) == -1) { ACE_OS::printf("open port fail"); return -1; } //注冊接受連接回調(diào)事件 return this->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK); } virtual ACE_HANDLE get_handle (void) const { return this->acceptor_.get_handle (); } virtual int handle_input (ACE_HANDLE fd ) { ClientService *client = new ClientService(); auto_ptr<ClientService> p (client); if (this->acceptor_.accept (client->peer ()) == -1) { ACE_OS::printf("accept client fail"); return -1; } p.release (); client->reactor (this->reactor ()); if (client->open () == -1) client->handle_close (ACE_INVALID_HANDLE, 0); return 0; } virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE) { ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL; this->reactor ()->remove_handler (this, m); this->acceptor_.close (); } return 0; } protected: ACE_SOCK_Acceptor acceptor_; }; int main(int argc, char *argv[]) { ACE_INET_Addr addr(3000,"192.168.1.142"); ClientAcceptor server; server.reactor(ACE_Reactor::instance()); server.open(addr); while(true) { ACE_Reactor::instance()->handle_events(); } return 0; }

    代碼功能比較簡單,需要注意以下幾點:

  • 這里注冊事件的方式和前面的文章中方式不一樣,是通過ACE_Event_Handler類的reactor()方法設(shè)置和獲取reactor的指針,比較直觀和方便。前面的文章是通過ACE_Reactor::instance()來獲取的一個單體reactor的指針。

  • 當(dāng)客戶端Socket連接關(guān)閉時,需要釋放相應(yīng)資源,需要注意一下ClientService對象的handle_close方法中釋放資源的相應(yīng)代碼。
  • 定時器的實現(xiàn)

    通過Reactor機(jī)制,還可以很容易的實現(xiàn)定時器的功能,使用方式如下。

  • 編寫一個事件反應(yīng)器,重載handle_timeout()方法,該方法是定時器的觸發(fā)時間到時,會自動觸發(fā)該方法。

  • 通過Reactor的schedule_timer()方法注冊定時器。

  • 啟動reacotr的handle_events()事件分發(fā)循環(huán)。

  • 當(dāng)不想使用定時器時,可以通過Reactor的cancel_timer()方法注銷定時器。

  • 下面的代碼簡單的實現(xiàn)了一個定時器,并具有基本的開啟,關(guān)閉功能。

    #include <ace/OS.h> #include <ace/Reactor.h> class MyTimerHandler : public ACE_Event_Handler { private: int inteval; //執(zhí)行時間間隔 int delay; //延遲執(zhí)行時間 int timerid; public: MyTimerHandler(int delay,int inteval) { this->delay=delay; this->inteval=inteval; } int open() //注冊定時器 { ACE_Time_Value delaytime(inteval); ACE_Time_Value intevaltime(inteval); timerid = reactor()->schedule_timer(this, 0, //傳遞handle_timeout給的參數(shù) delaytime, intevaltime); return timerid; } int close() //取消定時器 { return reactor()->cancel_timer(timerid); } //定時器回調(diào)函數(shù) int handle_timeout (const ACE_Time_Value &current_time, const void * = 0) { time_t epoch = ((timespec_t)current_time).tv_sec; ACE_DEBUG ((LM_INFO, ACE_TEXT ("handle_timeout: %s\n"), ACE_OS::ctime (&epoch))); return 0; } }; int main(int argc, char *argv[]) { MyTimerHandler * timer = new MyTimerHandler (3,5); timer->reactor(ACE_Reactor::instance()); timer->open(); for(int i=0;i<2;i++) //觸發(fā)次handle_timeout事件 { ACE_OS::printf("\n%d\n",i); ACE_Reactor::instance()->handle_events(); } timer->close(); ACE_OS::printf("cancel timer"); while(true) ACE_Reactor::instance()->handle_events(); return 0; }

    代碼功能比較簡單,這里就不多做介紹了。

    轉(zhuǎn)載于:https://www.cnblogs.com/dubingsky/archive/2009/07/22/1528292.html

    與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖

    總結(jié)

    以上是生活随笔為你收集整理的ACE入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。