| 當(dāng) OS 平臺(tái)支持異步操作時(shí),一種高效而方便的實(shí)現(xiàn)高性能 Web 服務(wù)器的方法是使用前攝式事件分派。使用前攝式事件分派模型設(shè)計(jì)的 Web 服務(wù)器通過一或多個(gè)線程控制來處理異步操作的完成。這樣,通過集成完成事件多路分離(completion event demultiplexing)和事件處理器分派,前攝器模式簡(jiǎn)化了異步的 Web 服務(wù)器。 異步的 Web 服務(wù)器將這樣來利用前攝器模式:首先讓 Web 服務(wù)器向 OS 發(fā)出異步操作,并將回調(diào)方法登記到 Completion Dispatcher(完成分派器),后者將在操作完成時(shí)通知 Web 服務(wù)器。于是 OS 代表 Web 服務(wù)器執(zhí)行操作,并隨即在一個(gè)周知的地方將結(jié)果排隊(duì)。Completion Dispatcher 負(fù)責(zé)使完成通知出隊(duì),并執(zhí)行適當(dāng)?shù)摹⒑袘?yīng)用特有的 Web 服務(wù)器代碼的回調(diào)。 使用前攝器模式的主要優(yōu)點(diǎn)是可以啟動(dòng)多個(gè)并發(fā)操作,并可并行運(yùn)行,而不要求應(yīng)用必須擁有多個(gè)線程。操作被應(yīng)用異步地啟動(dòng),它們?cè)?OS 的 I/O 子系統(tǒng)中運(yùn)行直到完成。發(fā)起操作的線程現(xiàn)在可以服務(wù) 另外的請(qǐng)求了。 在ACE中,可以通過ACE_Proactor實(shí)現(xiàn)前攝器模式。實(shí)現(xiàn)方式如下。 1。創(chuàng)建服務(wù)處理器: Proactor框架中服務(wù)處理器均派生自ACE_Service_Handler,它和Reactor框架的事件處理器非常類似。當(dāng)發(fā)生IO操作完成事件時(shí),會(huì)觸發(fā)相應(yīng)的事件完成會(huì)調(diào)函數(shù)。 2。實(shí)現(xiàn)服務(wù)處理器IO操作 Proactor框架中所有的IO操作都由相應(yīng)的異步操作類來完成,這些異步操作類都繼承自ACE_Asynch_Operation。常用的有以下幾種。 ACE_Asynch_Read_Stream, 提供從TCP/IP socket連接中進(jìn)行異步讀操作. ACE_Asynch_Write_Stream, 提供從TCP/IP socket連接中進(jìn)行異步寫操作. 使用這些操作類的一般方式如下: 初始化 將相關(guān)的操作注冊(cè)到服務(wù)處理器中,一般可通過調(diào)用其open方法實(shí)現(xiàn)。 發(fā)出IO操作 發(fā)出異步IO操作請(qǐng)求,該操作不會(huì)阻塞,具體的IO操作過程由操作系統(tǒng)異步完成。 IO操作完成回調(diào)處理 異步IO操作完成后,OS會(huì)觸發(fā)服務(wù)處理器中的相應(yīng)回調(diào)函數(shù),可通過該函數(shù)的ACE_Asynch_Result參數(shù)獲取相應(yīng)的返回值。 3。使用連接器或接受器和遠(yuǎn)端進(jìn)行連接 ACE為Proactor框架提供了兩個(gè)工廠類來建立TCP/IP連接。 ACE_Asynch_Acceptor, 用于被動(dòng)地建立連接 ACE_Asynch_Connector 用于主動(dòng)地建立連接 當(dāng)遠(yuǎn)端連接建立時(shí),連接器或接受器便會(huì)創(chuàng)建相應(yīng)的服務(wù)處理器,從而可以實(shí)現(xiàn)服務(wù)處理。 4。啟動(dòng)Proactor事件分發(fā)處理 啟動(dòng)事件分發(fā)處理只需如下調(diào)用: ????while(true) ????????ACE_Proactor::instance ()->handle_events (); 5。程序示例 服務(wù)器端: 服務(wù)器端簡(jiǎn)單的實(shí)現(xiàn)了一個(gè)EchoServer,流程如下: 當(dāng)客戶端建立連接時(shí),首先發(fā)出一個(gè)異步讀的異步請(qǐng)求,當(dāng)讀完成時(shí),將所讀的數(shù)據(jù)打印出來,并發(fā)出一個(gè)新的異步請(qǐng)求。 #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Acceptor.h"
class HA_Proactive_Service : public ACE_Service_Handler { public: ~HA_Proactive_Service () { if (this->handle () != ACE_INVALID_HANDLE) ACE_OS::closesocket (this->handle ()); }
virtual void open (ACE_HANDLE h, ACE_Message_Block&) { ???? this->handle (h); ???? if (this->reader_.open (*this) != 0 ) ???? { ???????? ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p/n"), ???????????? ACE_TEXT ("HA_Proactive_Service open"))); ???????? delete this; ???????? return; ???? }
???? ACE_Message_Block *mb = new ACE_Message_Block(buffer,1024); ???? if (this->reader_.read (*mb, mb->space ()) != 0) ???? { ???????? ACE_OS::printf("Begin read fail/n"); ???????? delete this; ???????? return; ???? }
???? return; }
//異步讀完成后會(huì)調(diào)用此函數(shù) virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { ???? ACE_Message_Block &mb = result.message_block (); ???? if (!result.success () || result.bytes_transferred () == 0) ???? { ???????? mb.release (); ???????? delete this; ???????? return; ???? }
???? mb.copy("");????//為字符串添加結(jié)束標(biāo)記'/0' ???? ACE_OS::printf("rev:/t%s/n",mb.rd_ptr()); ???? mb.release();
???? ACE_Message_Block *nmb = new ACE_Message_Block(buffer,1024); ???? if (this->reader_.read (*nmb, nmb->space ()) != 0)
???? return; }
private: ACE_Asynch_Read_Stream reader_; char buffer[1024]; };
int main(int argc, char *argv[]) { ????int port=3000; ????ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor; ???? ????if (acceptor.open (ACE_INET_Addr (port)) == -1) ????????return -1;
????while(true) ????????ACE_Proactor::instance ()->handle_events (); ???? ????return 0; } 客戶端: 客戶端代碼比較簡(jiǎn)單,就是每隔1秒鐘將當(dāng)前的系統(tǒng)時(shí)間轉(zhuǎn)換為字符串形式通過異步形式發(fā)送給服務(wù)器,發(fā)送完成后,釋放時(shí)間字符的內(nèi)存空間。 #include "ace/Message_Queue.h" #include "ace/Asynch_IO.h" #include "ace/OS.h" #include "ace/Proactor.h" #include "ace/Asynch_Connector.h"
class HA_Proactive_Service : public ACE_Service_Handler { public: ~HA_Proactive_Service () { if (this->handle () != ACE_INVALID_HANDLE) ACE_OS::closesocket (this->handle ()); }
virtual void open (ACE_HANDLE h, ACE_Message_Block&) { ???? this->handle (h); ???? if (this->writer_.open (*this) != 0 ) ???? { ???????? ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p/n"), ???????????? ACE_TEXT ("HA_Proactive_Service open"))); ???????? delete this; ???????? return; ???? }
???? ACE_OS::printf("connceted");
???? for(int i=0;i<10;i++)????//每隔秒中發(fā)送時(shí)間至服務(wù)器 ???? { ???????? ACE_OS::sleep(1); ???????? time_t now = ACE_OS::gettimeofday().sec(); ???????? char *time = ctime(&now);????????//獲取當(dāng)前時(shí)間的字符串格式 ???????? ACE_Message_Block *mb = new ACE_Message_Block(100); ???????? mb->copy(time);
???????? if (this->writer_.write(*mb,mb->length()) !=0) ???????? { ???????????? ACE_OS::printf("Begin read fail/n"); ???????????? delete this; ???????????? return; ???????? } ???? }
???? return; }
//異步寫完成后會(huì)調(diào)用此函數(shù) virtual void handle_write_dgram (const ACE_Asynch_Write_Stream::Result &result) { ???? ACE_Message_Block &mb = result.message_block (); ???? mb.release(); ???? return; }
private: ACE_Asynch_Write_Stream writer_; };
int main(int argc, char *argv[]) { ???? ????ACE_INET_Addr addr(3000,"192.168.1.142");
????HA_Proactive_Service *client = new HA_Proactive_Service(); ????ACE_Asynch_Connector<HA_Proactive_Service> connector; ???? ????connector.open(); ????if (connector.connect(addr) == -1) ????????return -1;
????while(true) ????????ACE_Proactor::instance ()->handle_events (); ???? ????return 0; } |