日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ACE中的Proactor介绍和应用实例

發(fā)布時間:2025/3/21 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ACE中的Proactor介绍和应用实例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
把這兩天做Proactor的一些經(jīng)驗和心得寫一下,可能會給一些人幫助。
??? Proactor是異步模式的網(wǎng)絡(luò)處理器,ACE中叫做“前攝器”。
??? 先講幾個概念:
????前攝器(Proactor)-異步的事件多路分離器、處理器,是核心處理類。啟動后由3個線程組成(你不需要關(guān)心這三個線程,我只是讓你知道一下有這回事存在)。
??? 接受器(Acceptor)-用于服務(wù)端,監(jiān)聽在一個端口上,接受用戶的請求。
??? 連接器(Connector)-用于客戶端,去連接遠程的監(jiān)聽。當(dāng)然,如果遠程是ACE寫的,就是Acceptor。
??? 異步模式-即非阻塞模式。網(wǎng)絡(luò)的傳輸速度一般來講為10Mbps、100Mbps、1000Mbps。拿千兆網(wǎng)來說,實際的傳輸速度為1000Mbps/8大概為128KB左右。我們的CPU一般為P4 3.0GHZ,如果是32位的處理器,一秒鐘大概可以處理6G的字節(jié),那么,128KB的網(wǎng)絡(luò)速度是遠遠及不上處理器的速度的。網(wǎng)絡(luò)發(fā)送數(shù)據(jù)是一位一位發(fā)送出去的,如果CPU等在這里,發(fā)送完成函數(shù)才結(jié)束,那么,處理器浪費了大量時間在網(wǎng)絡(luò)傳輸上。
??? 操作系統(tǒng)提供了異步的模式來傳輸網(wǎng)絡(luò)數(shù)據(jù),工作模式即:應(yīng)用程序把要發(fā)送的數(shù)據(jù)交給操作系統(tǒng),操作系統(tǒng)把數(shù)據(jù)放在系統(tǒng)緩沖區(qū)后就告訴應(yīng)用程序OK了,我?guī)湍惆l(fā),應(yīng)用程序該干嘛干嘛去。操作系統(tǒng)發(fā)送完成后,會給應(yīng)用系統(tǒng)一個回執(zhí),告訴應(yīng)用程序:剛才那個包發(fā)送完成了!
?? 舉個例子:你有幾封郵件和包裹要發(fā),最有效率的辦法是什么?你把郵件和包裹及交給總臺,總臺MM說,好了,你幫你發(fā),你忙去吧!然后你去工作了。過了一會,總臺MM打電話告訴你:“剛才我叫快遞公司的人來了,把你的包裹發(fā)出去了。郵局的人也來了,取走了郵件,放心好了”。同樣,如果你知道今天會有包裹來,比如你在淘寶上購物了,你能成天等在總臺?你應(yīng)該告訴總臺MM:“今天可能有我的一個快遞,你幫我收一下,晚上請你肯德基!”。MM:“看在肯得基的面子上,幫你收了”。某個時間,MM打電話來了:“帥哥,你的包裹到了,我?guī)湍愫炇樟?#xff0c;快來拿吧。”
?? 因為操作系統(tǒng)是很有效率的,所有,他在后臺收發(fā)是很快的。應(yīng)用程序也很簡單。Proactor就是這種異步模式的。Proactor就是總臺MM;ACE_Service_Handle就是總臺代為收發(fā)郵件的公司流程。

我們看一個實例:

//***********************************************************
class?TPTCPAsynchServerImpl?:?public?ACE_Service_Handler
{
public:
?TPTCPAsynchServerImpl(
void);
?
~TPTCPAsynchServerImpl(void);

?
virtual?void?open?(ACE_HANDLE?handle,?ACE_Message_Block?&message_block);?
?
virtual?void?handle_read_stream?(const?ACE_Asynch_Read_Stream::Result?&result);

?
virtual?void?handle_write_stream?(const?ACE_Asynch_Write_Stream::Result?&result);

?
virtual?void??handle_time_out?(const?ACE_Time_Value?&tv,?const?void?*act=0);

private:
?
int?initiate_read_stream?(const?ACE_Asynch_Read_Stream::Result?&result);

?ACE_Asynch_Read_Stream?rs_;
?ACE_Asynch_Write_Stream?ws_;

};

這個例子從ACE_Service_Handler繼承過來,ACE_Service_Handle主要就是定義了一些回調(diào)函數(shù)。
1、?virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block);
? 當(dāng)有客戶端連接上來,連接建立成功后Proactor會調(diào)用這個方法。

2、?virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
當(dāng)用戶要讀的數(shù)據(jù)讀好了后,調(diào)用這個方法

3、virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
當(dāng)用戶要寫的數(shù)據(jù)在網(wǎng)卡上發(fā)送成功后,Proactor會回調(diào)這個方法

4、?virtual void? handle_time_out (const ACE_Time_Value &tv, const void *act=0);
當(dāng)用戶設(shè)定的時鐘到期了,這個方法會被調(diào)用。

這跟和總臺MM的聯(lián)絡(luò)方法是不是一樣的?

對還缺點東西,缺少怎么向總臺MM交待任務(wù)的方法。下面看看:

首先,創(chuàng)建一個監(jiān)聽器。

?ACE_Asynch_Acceptor<TPTCPAsynchServerImpl> acceptor_;
看到?jīng)],就是我們剛才寫的類,因為他繼承了回調(diào)接口,并實現(xiàn)了自已的代碼,模板中ACE_Asynch_Acceptor會在合適的時候回調(diào)這些方法。

//創(chuàng)建一個地址對象
?ACE_INET_Addr?addr(port,?ip);
acceptor_.open?(addr,?
8?*?1024,?1);

??? Open后,就開始監(jiān)聽了。其它的,向Proactor注冊一些事件的事模板類中都替你做了,你不需要做很多事。
??? 那么,已經(jīng)開始監(jiān)聽了,我的程序從哪里開始呢?對于一個服務(wù)程序來講,程序是被用戶的連接驅(qū)動的,一個用戶程序想和通訊,必須先創(chuàng)建連接,就是Socket中的connect操作。這個操作Proactor會替我們做一些工作,當(dāng)連接創(chuàng)建完成后,上面講的Open方法會被調(diào)用,我們看看Open方法中都有些什么代碼:

void?TPTCPAsynchServerImpl::open?(ACE_HANDLE?handle,?ACE_Message_Block?&message_block)
{
?ACE_DEBUG?((LM_DEBUG,?
"%N:%l:TPTCPAsynchServerImpl::open()..."));

?
//構(gòu)造讀流
?if?(rs_.open?(*this,?handle)?==?-1)
?{
??ACE_ERROR?((LM_ERROR,?
"%N:%l:",?"TPTCPAsynchServerImpl::open()?Error"));
??
return;
????}

?
//構(gòu)造寫流
?if?(ws_.open(*this,?handle)?==?-1)
?{
??ACE_ERROR?((LM_ERROR,?
"%N:%l:",?"TPTCPAsynchServerImpl::open()?Error"));
??
return;
????}

?
//獲取客戶端連接地址和端口
?ACE_INET_Addr?addr;?
?ACE_SOCK_SEQPACK_Association?ass
=ACE_SOCK_SEQPACK_Association(handle);?
?size_t?addr_size
=1;?
?ass.get_local_addrs(
&addr,addr_size);?

?
this->server_->onClientConnect((int)handle,?addr.get_ip_address(),?addr.get_port_number());


?


?
//如果客戶連接時同時提交了數(shù)據(jù),需要偽造一個結(jié)果,然后呼叫讀事件
?if?(message_block.length?()?!=?0)
?{
?
//?ACE_DEBUG((LM_DEBUG,?"message_block.length()?!=?0 "));
??
//?復(fù)制消息塊
??ACE_Message_Block?&duplicate?=??*message_block.duplicate?();

??
//?偽造讀結(jié)果,以便進行讀完成回調(diào)
??ACE_Asynch_Read_Stream_Result_Impl?*fake_result?=
????????ACE_Proactor::instance?()
->create_asynch_read_stream_result?(this->proxy?(),
?????????????????????????????????????????????????????????????????????
this->handle_,
?????????????????????????????????????????????????????????????????????duplicate,
?????????????????????????????????????????????????????????????????????
1024,
?????????????????????????????????????????????????????????????????????
0,
?????????????????????????????????????????????????????????????????????ACE_INVALID_HANDLE,
?????????????????????????????????????????????????????????????????????
0,
?????????????????????????????????????????????????????????????????????
0);

??size_t?bytes_transferred?
=?message_block.length?();

??
//?Accept事件處理完成,wr_ptr指針會被向前移動,將其移動到開始位置
??duplicate.wr_ptr?(duplicate.wr_ptr?()?-?bytes_transferred);

??
//?這個方法將調(diào)用回調(diào)函數(shù)
??fake_result->complete?(message_block.length?(),?1,?0);

??
//?銷毀偽造的讀結(jié)果
??delete?fake_result;
?}


?
//?否則,通知底層,準(zhǔn)備讀取用戶數(shù)據(jù)
?
//創(chuàng)建一個消息塊。這個消息塊將用于從套接字中異步讀?
?ACE_Message_Block?*mb?=?0;
??ACE_NEW?(mb,?ACE_Message_Block?(_bufSize));

?
if?(rs_.read?(*mb,?mb->size?()?-?1)?==?-1)
?{
??delete?mb;
??ACE_ERROR?((LM_ERROR,?
"%N:%l:open?init?read?failed!"));
??
return;
?}
}


我們看到,首先創(chuàng)建了兩個流,就是前面類定義中定義的一個異步寫流,一個異步讀流。以后對網(wǎng)絡(luò)的讀和寫就通過這兩個流進行。我還給出了一段讀客戶端地址和端口的代碼。然后是讀取客戶Connect可能附帶的數(shù)據(jù),那段代碼不用看懂,以后使用照抄就行。然后就是

?if?(rs_.read?(*mb,?mb->size?()?-?1)?==?-1)
?{
??delete?mb;
??ACE_ERROR?((LM_ERROR,?
"%N:%l:open?init?read?failed!"));
??
return;
?}

這段代碼使用讀流讀一段數(shù)據(jù)。這段代碼就是向總臺MM交待:我要收包裹,收好了叫我!
也就是說,這段代碼99%的可能是讀不出數(shù)據(jù)的,只是向Proactor注冊讀的事件,具體的等待、讀取操作由Proactor讀,讀到了,就回調(diào)Handle_Read_Stream方法。ACE_Message_Block是消息塊,數(shù)據(jù)就是存放在消息塊中的。
下面看看Handle_Read_Stream方法的代碼:

void TPTCPAsynchServerImpl::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
?result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '/0';

? ACE_DEBUG ((LM_DEBUG, "********************/n"));
? ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_to_read", result.bytes_to_read ()));
? ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "handle", result.handle ()));
? ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_transfered", result.bytes_transferred ()));
? ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "act", (u_long) result.act ()));
? ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "success", result.success ()));
? ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "completion_key", (u_long) result.completion_key ()));
? ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "error", result.error ()));
? ACE_DEBUG ((LM_DEBUG, "********************/n"));

?result.message_block().release();

?if (this->initiate_read_stream (result) == -1)
?{
??ACE_ERROR((LM_ERROR, "%N:%l:read stream failed!connection closed, remove it:%d/n", result.handle()));
??closeConnection(result.handle());
?}?
}

這個函數(shù)被調(diào)用,就表明有數(shù)據(jù)已經(jīng)讀好了,包裹已經(jīng)在總臺了。Proactor比總臺MM還好,給你送上門了,數(shù)據(jù)就在Result里,上面演示了Result中的數(shù)據(jù)。然后把消息塊釋放了,然后調(diào)用initiate_read_stream繼續(xù)監(jiān)聽網(wǎng)絡(luò)上可能到來的數(shù)據(jù)。看看initiate_read_stream好了:

int?TPTCPAsynchServerImpl::initiate_read_stream?(const?ACE_Asynch_Read_Stream::Result?&result)
{
?ACE_DEBUG((LM_TRACE,?
"%N:%l:TPTCPAsynchServerImpl::initiate_read_stream()"));
?
//創(chuàng)建一個消息塊。這個消息塊將用于從套接字中異步讀?
?ACE_Message_Block?*mb?=?new?ACE_Message_Block(_bufSize);
?
if?(mb?==?NULL)
?{
??ACE_DEBUG((LM_ERROR,?
"%N:%l:can't?allock?ACE_Message_Block.?"));?
??
return?-1;
?}
?

?
if?(rs_.read?(*mb,?mb->size?()?-?1)?==?-1)
?{
??delete?mb;
??ACE_ERROR_RETURN?((LM_ERROR,?
"%N:%l:rs->read()?failed,?clientID=%d",?result.handle()),??-1);
?}

?
return?0;
}


代碼很簡單,就是創(chuàng)建一個新的消息塊,然后使用讀流注冊一個讀消息就可以了。

到此為止,Proactor的讀流程很清楚了吧?

下面再說一個寫流程。

寫流程其實更簡單,在任意想向客戶端寫數(shù)據(jù)的地方,調(diào)用相應(yīng)代碼就行了,比如,我們提供了SendData方法來發(fā)送數(shù)據(jù),在任意想發(fā)送數(shù)據(jù)的地方調(diào)用SendData就行了,SendData的代碼如下:

int?TPTCPAsynchServerImpl::sendData(int?clientID,?const?char?*data,?int?dataLen,?unsigned?int?&id)
{
?ACE_DEBUG((LM_DEBUG,?
"TPTCPAsynchServerImpl::sendData(void)"));

?ACE_Message_Block?
*mb;?
?ACE_NEW_RETURN(mb,?ACE_Message_Block(dataLen?
+?1),?-1);

?mb
->wr_ptr((char*)data);??????????????????
?ACE_OS::memcpy(mb
->base(),(char*)data,?dataLen);

?id?
=?GlobleSingleton::instance()->getIndex();
?mb
->msg_type((int)id);

?
//向操作系統(tǒng)發(fā)送數(shù)據(jù)
?if?(connection->ws->write?(*mb?,?dataLen?)?==?-1)
?{
??ACE_ERROR_RETURN((LM_ERROR,?
"%N:%l:sendData?failed!?clientID=%d",?clientID),-1);
?}

?
return?0;
}


簡單說,就是創(chuàng)建了一個消息塊,把用戶數(shù)據(jù)拷貝進來,然后調(diào)用寫流WS向Proactor發(fā)送一個Write事件就可以了,發(fā)送成功后,Handle_write_handle會被調(diào)用,看一下:

void
TPTCPAsynchServerImpl::handle_write_stream?(
const?ACE_Asynch_Write_Stream::Result?&result)
{
??ACE_DEBUG?((LM_DEBUG,
??????????????
"handle_write_stream?called"));

??
//?Reset?pointers.
??result.message_block?().rd_ptr?(result.message_block?().rd_ptr?()?-?result.bytes_transferred?());

??ACE_DEBUG?((LM_DEBUG,?
"********************"));
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%d",?"bytes_to_write",?result.bytes_to_write?()));
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%d",?"handle",?result.handle?()));
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%d",?"bytes_transfered",?result.bytes_transferred?()));
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%d",?"act",?(u_long)?result.act?()));
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%d",?"success",?result.success?()));
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%d",?"completion_key",?(u_long)?result.completion_key?()));
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%d",?"error",?result.error?()));
??ACE_DEBUG?((LM_DEBUG,?
"********************"));
#if?0
??ACE_DEBUG?((LM_DEBUG,?
"%s?=?%s",?"message_block",?result.message_block?().rd_ptr?()));
#endif

??
//?Release?message?block.
??result.message_block?().release?();
}


代碼中使用了result中發(fā)數(shù)據(jù),然后把消息塊釋放了,就這么簡單。




這是簡單的proactor用法,當(dāng)然,復(fù)雜也基本就這樣用。所謂不基本的不是Proactor的內(nèi)容,而是服務(wù)器編程本身的麻煩。比如說,多個連接的管理、重發(fā)機制、發(fā)送隊列等等,這都不是ACE的內(nèi)容。這些要大家自己思考了,并添加。

在這里,我要說幾個重要的問題:連接的管理。Acceptor是一個類,但是在每一個連接,Proactor都用了某種辦法創(chuàng)建了一個實例,所以,連接管理的群集類一定不能在Acceptor類中,不然得到的結(jié)果就是始終只有一條記錄。因為每個Acceptor都有一個實例,實例對應(yīng)一個連接,群集類也就每個實例一個了。要采取的方法是一個全局的容器對象就可以了。比如我這個類:

typedef?ACE_Map_Manager?<ACE_HANDLE,?ConnectionBean?*,?ACE_Null_Mutex>?ConnectionMap;
typedef?ACE_Map_Iterator
<ACE_HANDLE,?ConnectionBean?*,?ACE_Null_Mutex>?ConnectionIterator;
typedef?ACE_Map_Entry???
<ACE_HANDLE,?ConnectionBean?*>?ConnectionEntry;

class?Globle
{
public:
?Globle(
void);
?
~Globle(void);

?ITPServer
*?server_;
?ConnectionMap?_connections;

?unsigned?getIndex(
void);?
?
long?getTimerId(void);

private:
?unsigned?
int?index_;

?
long?timerId_;
};

typedef?ACE_Singleton
<Globle,?ACE_Null_Mutex>?GlobleSingleton;


我使用ACE的Singleton模板創(chuàng)建這個類,每一個Acceptor要使用ConnectionMap,都使用這里的_connections,方法如下 :
? GlobleSingleton::instance()->connection.bind()......

這個問題可是我花費了2天時間找出來的,諸位同仁不可不戒啊,給點掌聲:)

?轉(zhuǎn)自: http://blog.newsfan.net/leelin/archive/2006/11/01/4159.aspx

總結(jié)

以上是生活随笔為你收集整理的ACE中的Proactor介绍和应用实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。