日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ACE proactor example

發布時間:2025/3/21 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ACE proactor example 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

網上看的兩個好的例子-學習ACE時經??吹摹?/p>

這個文章應該是介紹ACE編程的一個很好的原創文章,個人非常推薦了!

1、WIN32下面用proactor可以達到幾乎RAW IOCP的效率,由于封裝關系,應該是差那么一點。

?

客戶端處理類的常規寫法:

[cpp] view plain copy
  • //處理客戶端連接消息??
  • class?ClientHandler?:?public?ACE_Service_Handler??
  • {??
  • public:??
  • /**構造函數?
  • ?*?
  • ?*?
  • ?*/??
  • ClientHandler(unsigned?int?client_recv_buf_size=SERVER_CLIENT_RECEIVE_BUF_SIZE)??
  • ?:_read_msg_block(client_recv_buf_size),_io_count(0)??
  • {??
  • }??
  • ??
  • ??
  • ~ClientHandler(){}??
  • ??
  • /**?
  • ?*初始化,因為可能要用到ClientHandler內存池,而這個池又不一定要用NEW?
  • ?*/??
  • void?init();??
  • ??
  • /**清理函數,因為可能要用到內存池?
  • ?*?
  • ?*/??
  • void?fini();??
  • ??
  • ??
  • //檢查是否超時的函數??
  • ??
  • void?check_time_out(time_t?cur_time);??
  • ??
  • public:??
  • ??
  • /**客戶端連接服務器成功后調用?
  • ?*?
  • ?*?/param?handle?套接字句柄?
  • ?*?/param?&message_block?第一次讀到的數據(未用)?
  • ?*/??
  • ??
  • ??
  • //由Acceptor來調用!!!??
  • virtual?void?open?(ACE_HANDLE?handle,ACE_Message_Block?&message_block);??
  • ??
  • /**處理網絡讀操作結束消息?
  • ?*?
  • ?*?/param?&result?讀操作結果?
  • ?*/??
  • virtual?void?handle_read_stream?(const?ACE_Asynch_Read_Stream::Result?&result);??
  • ??
  • ??
  • /**處理網絡寫操作結束消息?
  • ?*?
  • ?*?/param?&result?寫操作結果?
  • ?*/??
  • virtual?void?handle_write_stream?(const?ACE_Asynch_Write_Stream::Result?&result);??
  • ??
  • private:??
  • ??
  • //**生成一個網絡讀請求??
  • ?*??
  • ?*?/param?void???
  • ?*?/return?0-成功,-1失敗??
  • ?*/??
  • int??initiate_read_stream??(void);??
  • ??
  • /**生成一個寫請求?
  • ?*?
  • ?*?/param?mb?待發送的數據?
  • ?*?/param?nBytes?待發送數據大小?
  • ?*?/return?0-成功,-1失敗?
  • ?*/??
  • int??initiate_write_stream?(ACE_Message_Block?&?mb,?size_t?nBytes?);??
  • ??
  • /**?
  • ?*?
  • ?*?/return?檢查是否可以刪除,用的是一個引用計數。每一個外出IO的時候+1,每一個IO成功后-1?
  • ?*/??
  • int?check_destroy();??
  • ??
  • //異步讀??
  • ACE_Asynch_Read_Stream?_rs;??
  • ??
  • //異步寫??
  • ACE_Asynch_Write_Stream?_ws;??
  • ??
  • //接收緩沖區只要一個就夠了,因為壓根就沒想過要多讀,我直到現在也不是很清楚為什么要多讀,多讀的話要考慮很多問題??
  • ACE_Message_Block?_read_msg_block;??
  • ??
  • //套接字句柄,這個可以不要了,因為基類就有個HANDLER在里面的。??
  • //ACE_HANDLE?_handle;??
  • ??
  • //一個鎖,客戶端反正有東東要鎖的,注意,要用ACE_Recursive_Thread_Mutex而不是用ACE_Thread_Mutex,這里面是可以重入的,而且在WIN32下是直接的EnterCriticalSection,可以達到很高的效率??
  • ACE_Recursive_Thread_Mutex?_lock;??
  • ??
  • //在外IO數量,其實就是引用計數啦,沒啥的。為0的時候就把這個東東關掉啦。??
  • long?_io_count;??
  • ??
  • ??
  • //檢查超時用的,過段時間沒東東就CLOSE他了。??
  • ??
  • time_t?_last_net_io;??
  • ??
  • private:??
  • ??
  • ??
  • //本來想用另外一種模型的,只用1個或者2個外出讀,后來想想,反正一般內存都是足夠的,就不管了。??
  • ??
  • //ACE_Message_Block?_send_msg_blocks[2];??
  • ??
  • //ACE_Message_Block?&_sending_msg_block;??
  • ??
  • //ACE_Message_Block?&_idle_msg_block;??
  • ??
  • private:??
  • ??
  • public:??
  • //TODO:move?to?prriva?and?use?friend?class!!!??
  • ??
  • ??
  • //只是為了效率更高,不用STL的LIST是因為到現在我沒有可用的Node_Allocator,所以效率上會有問題。??
  • ClientHandler?*_next;??
  • ??
  • ClientHandler?*next(){return?_next;}??
  • ??
  • void?next(ClientHandler?*obj){_next=obj;}??
  • ??
  • };??
  • ??
  • ???
  • ??
  • //這是具體實現,有些地方比較亂,懶得管了,鎖的有些地方不對。懶得改了,反正在出錯或者有瓶頸的時候再做也不遲。??
  • ??
  • void?ClientHandler::handle_read_stream?(const?ACE_Asynch_Read_Stream::Result?&result)??
  • {??
  • _last_net_io=ACE_OS::time(NULL);??
  • int?byterecved=result.bytes_transferred?();??
  • if?(?(result.success?())?&&?(byterecved?!=?0))??
  • {??
  • ?//ACE_DEBUG?((LM_DEBUG,??"Receiver?completed:%d/n",byterecved));??
  • ??
  • ??
  • //處理完數據??
  • ?if(handle_received_data()==true)??
  • ?{??
  • ??//ACE_DEBUG?((LM_DEBUG,??"go?on?reading.../n"));??
  • ??
  • ??
  • //把東東推到頭部,處理粘包??
  • ??_read_msg_block.crunch();??
  • ??initiate_read_stream();??
  • ?}??
  • }??
  • ??
  • ??
  • //這個地方不想用ACE_Atom_op,因為反正要有一個鎖,而且一般都會用鎖,不管了。假如不在意的話,應該直接用ACE_Atom_Op以達到最好的效率??
  • ??
  • {??
  • ?ACE_Guard<ACE_Recursive_Thread_Mutex>?locker?(_lock);??
  • ?_io_count--;??
  • }??
  • check_destroy?();??
  • }??
  • ??
  • void?ClientHandler::init()??
  • {??
  • ??
  • ??
  • //初始化數據,并不在構造函數里做。??
  • _last_net_io=ACE_OS::time(NULL);??
  • _read_msg_block.rd_ptr(_read_msg_block.base());??
  • _read_msg_block.wr_ptr(_read_msg_block.base());??
  • this->handle(ACE_INVALID_HANDLE);??
  • }??
  • ??
  • bool?ClientHandler::handle_received_data()??
  • {??
  • ??
  • ??
  • ...........自己處理??
  • return?true;??
  • }??
  • ??
  • ??
  • //==================================================================??
  • void?ClientHandler::handle_write_stream?(const?ACE_Asynch_Write_Stream::Result?&result)??
  • {??
  • //發送成功,RELEASE掉??
  • //這個不可能有多個RELEASE,直接XX掉??
  • //result.message_block?().release?();??
  • MsgBlockManager::get_instance().release_msg_block(&result.message_block());??
  • ??
  • {??
  • ?ACE_Guard<ACE_Recursive_Thread_Mutex>?locker?(_lock);??
  • ?_io_count--;??
  • }??
  • check_destroy?();??
  • }??
  • ??
  • //bool?ClientHandler::destroy?()???
  • //{??
  • //?FUNC_ENTER;??
  • //?ClientManager::get_instance().release_client_handle(this);??
  • //?FUNC_LEAVE;??
  • //?return?false?;??
  • //}??
  • ??
  • ??
  • int??ClientHandler::initiate_read_stream??(void)??
  • {??
  • ACE_Guard<ACE_Recursive_Thread_Mutex>?locker?(_lock);??
  • ??
  • ??
  • //考慮到粘包的呀??
  • if?(_rs.read?(_read_msg_block,?_read_msg_block.space())?==?-1)??
  • {??
  • ?ACE_ERROR_RETURN?((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);??
  • }??
  • _io_count++;??
  • return?0;??
  • }??
  • ??
  • /**生成一個寫請求?
  • *?
  • *?/param?mb?待發送的數據?
  • *?/param?nBytes?待發送數據大小?
  • *?/return?0-成功,-1失敗?
  • */??
  • int??ClientHandler::initiate_write_stream?(ACE_Message_Block?&?mb,?size_t?nBytes?)??
  • {??
  • ACE_Guard<ACE_Recursive_Thread_Mutex>?locker?(_lock);??
  • if?(_ws.write?(mb?,?nBytes?)?==?-1)??
  • {??
  • ?mb.release?();??
  • ?ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Write_File::write"),-1);??
  • }??
  • _io_count++;??
  • return?0;??
  • }??
  • ??
  • void?ClientHandler::open?(ACE_HANDLE?handle,ACE_Message_Block?&message_block)??
  • {??
  • //FUNC_ENTER;??
  • _last_net_io=ACE_OS::time(NULL);??
  • _io_count=0;??
  • if(_ws.open(*this,this->handle())==-1)??
  • {??
  • ?ACE_ERROR?((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));??
  • }??
  • else?if?(_rs.open?(*this,?this->handle())?==?-1)??
  • {??
  • ?ACE_ERROR?((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));??
  • }??
  • else??
  • {??
  • ?initiate_read_stream?();??
  • }??
  • ??
  • check_destroy();??
  • //FUNC_LEAVE;??
  • }??
  • ??
  • void?ClientHandler::fini()??
  • {??
  • }??
  • ??
  • void?ClientHandler::check_time_out(time_t?cur_time)??
  • {??
  • //ACE_Guard<ACE_Recursive_Thread_Mutex>?locker?(_lock);??
  • //ACE_DEBUG((LM_DEBUG,"cur_time?is?%u,last?io?is?%u/n",cur_time,_last_net_io));??
  • ??
  • //檢測是否已經為0了??
  • if(this->handle()==ACE_INVALID_HANDLE)??
  • ?return;??
  • if(cur_time-_last_net_io>CLIENT_TIME_OUT_SECONDS)??
  • {??
  • ?ACE_OS::shutdown(this->handle(),SD_BOTH);??
  • ?ACE_OS::closesocket(this->handle());??
  • ?this->handle(ACE_INVALID_HANDLE);??
  • }??
  • }??
  • ??
  • int?ClientHandler::check_destroy()??
  • {??
  • {??
  • ?ACE_Guard<ACE_Recursive_Thread_Mutex>?locker?(_lock);??
  • ?if?(_io_count>?0)??
  • ??return?1;??
  • }??
  • ACE_OS::shutdown(this->handle(),SD_BOTH);??
  • ACE_OS::closesocket(this->handle());??
  • this->handle(ACE_INVALID_HANDLE);??
  • ??
  • ??
  • //這個地方給內存池吧。??
  • ClientManager::get_instance().release_client_handle(this);??
  • //delete?this;??
  • return?0;??
  • }??
  • ??
  • ???
  • ??
  • ??
  • 這個也很好!ACE的好文!真是不轉我覺得后悔啊!??
  • 沒啥好說的,管理所有的客戶端和內存池的功能。??
  • ??
  • class?ClientManager?:?public?SingleTon<ClientManager>??
  • {??
  • public:??
  • ??
  • ClientManager():_header(NULL){}??
  • ??
  • ~ClientManager(){}??
  • ??
  • public:??
  • ??
  • void?init(unsigned?int?default_pool_size,unsigned?int?default_read_buf_size);??
  • ??
  • void?fini();??
  • ??
  • public:??
  • ??
  • ClientHandler?*get_clienthandler();??
  • ??
  • void?release_client_handle(ClientHandler?*client);??
  • ??
  • void?check_time_out();??
  • ??
  • size_t?get_client_count();??
  • ??
  • private:??
  • ??
  • ClientHandler?*_header;??
  • ??
  • std::set<ClientHandler?*>?_active_clients;??
  • ??
  • ACE_Recursive_Thread_Mutex?_lock;??
  • };??
  • ??
  • ???
  • ??
  • ???
  • ??
  • #include?"clientmanager.h"??
  • #include?<ace/Guard_T.h>??
  • ??
  • ClientHandler?*ClientManager::get_clienthandler()??
  • {??
  • FUNC_ENTER;??
  • ACE_Guard<ACE_Recursive_Thread_Mutex>?locker(_lock);??
  • ClientHandler?*ret=NULL;??
  • if(_header==NULL)??
  • {??
  • ?ACE_DEBUG((LM_DEBUG,"client?>?max?clients!!!/n"));??
  • }??
  • else??
  • {??
  • ?ret=_header;??
  • ?_header=_header->next();??
  • ?ret->init();??
  • ?_active_clients.insert(ret);??
  • }??
  • FUNC_LEAVE;??
  • return?ret;??
  • }??
  • ??
  • void?ClientManager::release_client_handle(ClientHandler?*client)??
  • {??
  • //FUNC_ENTER;??
  • ACE_Guard<ACE_Recursive_Thread_Mutex>?locker(_lock);??
  • client->fini();??
  • client->next(_header);??
  • _header=client;??
  • _active_clients.erase(client);??
  • //FUNC_LEAVE;??
  • }??
  • ??
  • void?ClientManager::init(unsigned?int?default_pool_size,unsigned?int?default_read_buf_size)??
  • {??
  • //FUNC_ENTER;??
  • for(unsigned?int?i=0;i<default_pool_size;i++)??
  • {??
  • ?ClientHandler?*client=new?ClientHandler(default_read_buf_size);??
  • ?client->next(_header);??
  • ?_header=client;??
  • }??
  • //FUNC_LEAVE;??
  • }??
  • ??
  • void?ClientManager::fini()??
  • {??
  • //FUNC_ENTER;??
  • while(_header)??
  • {??
  • ?ClientHandler?*temp=_header->next();??
  • ?delete?_header;??
  • ?_header=temp;??
  • }??
  • //FUNC_LEAVE;??
  • }??
  • ??
  • void?ClientManager::check_time_out()??
  • {??
  • time_t?cur_time=ACE_OS::time(NULL);??
  • ACE_Guard<ACE_Recursive_Thread_Mutex>?locker(_lock);??
  • for(std::set<ClientHandler?*>::iterator?it=_active_clients.begin();it!=_active_clients.end();it++)??
  • {??
  • ?(*it)->check_time_out(cur_time);??
  • }??
  • }??
  • ??
  • size_t?ClientManager::get_client_count()??
  • {??
  • ACE_Guard<ACE_Recursive_Thread_Mutex>?locker(_lock);??
  • return?_active_clients.size();??
  • }??
  • ??
  • ??
  • //服務器的設計??
  • 按照七貓的說話,這個框架可以達到IOCP的效率,真的利害!但是我也不知道真偽!所以大家不要認為是我說的阿!我沒有測試過,所以也不太清楚是否真的有那么高的效率!??
  • 沒有什么可說的!?好文章!??
  • ??
  • class?MyServer?:?public?SingleTon<MyServer?>??
  • {??
  • public:??
  • /**主服務器初始化工作?
  • *?
  • *?/param?*listenaddr?監聽地址:"192.168.0.188:80"-在192.168.0.188的80端口進行監聽?
  • "80"-在所有IP地址的80端口進行監聽?
  • *?/param?numOfThreads?服務器網絡消息處理的線程個數?
  • *?/return?1:成功,0或者其他值:失敗?
  • */??
  • int?init(const?char?*listenaddr,unsigned?int?numOfThreads);??
  • ??
  • /**最后清理工作,資源釋放工作?
  • ?*?
  • ?*/??
  • void?fini();??
  • ??
  • /**主服務器開始運行?
  • *?
  • *?/return?1-成功,-1失敗?
  • */??
  • int?start();??
  • ??
  • /**主服務器停止運行?
  • *?
  • */??
  • void?stop();??
  • private:??
  • //任務管理器(線程池)??
  • ServerTask?_task;??
  • //監聽地址??
  • ACE_INET_Addr?_listen_addr;??
  • //網絡接收器??
  • ClientAcceptor?_acceptor;??
  • //網絡消息處理線程數量??
  • unsigned?int?_num_of_threads;??
  • private:??
  • ??
  • ???
  • ??
  • Observer?_observer;??
  • ??
  • //檢查是否有客戶端超時??
  • CheckClientTimeoutHandler?_check_time_out_handler;??
  • };??
  • ??
  • ??
  • int?MyServer::init(const?char?*listenaddr,unsigned?int?numOfThreads)??
  • {??
  • //ACE_WIN32_Proactor?*pImpl?=?new?ACE_WIN32_Proactor;??
  • //static?ACE_Proactor?proactor(pImpl,1);??
  • //ACE_Proactor::instance(?&?proactor);??
  • _listen_addr=ACE_INET_Addr(listenaddr);??
  • //_num_of_threads=numOfThreads;??
  • _num_of_threads=1;??
  • _observer.init();??
  • _syn_cms_handler.init();??
  • _check_time_out_handler.init();??
  • return?1;??
  • }??
  • ??
  • void?MyServer::fini()??
  • {??
  • ItemManager::get_instance().purge_all_items();??
  • _observer.fini();??
  • _syn_cms_handler.fini();??
  • _check_time_out_handler.fini();??
  • }??
  • ??
  • /**主服務器開始運行?
  • *?
  • *?/return?1-成功,-1失敗?
  • */??
  • int?MyServer::start()??
  • {??
  • int?Rc?=?_acceptor.open?(_listen_addr,0,1);??
  • if(Rc==-1)??
  • {??
  • ?ACE_ERROR_RETURN?((LM_ERROR,?"acceptor?error./n"),?-1);??
  • }??
  • ??
  • ???
  • ??
  • ??
  • //每20秒檢查一次,檢查是否有客戶端超時??
  • ACE_Time_Value?check_client_timeout_interval(120);??
  • Rc=ACE_Proactor::instance?()->schedule_timer?(??
  • ?_check_time_out_handler,??
  • ?(void?*)?"timeout",??
  • ?ACE_Time_Value::zero,??
  • ?check_client_timeout_interval);??
  • if(Rc==-1)??
  • {??
  • ?ACE_ERROR_RETURN?((LM_ERROR,?"%p/n",?"check_client_timeout?schedule_timer"),?-1);??
  • }??
  • ??
  • ACE_Time_Value?observertime(20);??
  • Rc=ACE_Proactor::instance?()->schedule_timer?(??
  • ?_observer,??
  • ?(void?*)?"observer",??
  • ?ACE_Time_Value::zero,??
  • ?observertime);??
  • if(Rc==-1)??
  • {??
  • ?ACE_ERROR_RETURN?((LM_ERROR,?"%p/n",?"observer?schedule_timer"),?-1);??
  • }??
  • ??
  • if?(_task.activate?(THR_NEW_LWP,?_num_of_threads?)?==?-1)??
  • {??
  • ?ACE_ERROR_RETURN?((LM_ERROR,?"task?start?error./n",?"main"),?-1);??
  • }??
  • return?1;??
  • }??
  • /**主服務器停止運行?
  • *?
  • */??
  • void?MyServer::stop()??
  • {??
  • ACE_Proactor::end_event_loop?()?;??
  • ACE_Thread_Manager?*?pTM?=?ACE_Thread_Manager::instance();??
  • ??
  • pTM->wait_task?(?&?_task)?;??
  • //ACE_Proactor::instance(?(?ACE_Proactor*?)NULL?);??
  • }??
  • ??
  • ??
  • int?ACE_TMAIN(int?argc,char?*argv[])??
  • {??
  • FUNC_ENTER;???
  • std::cout<<"size?of?item?is?"<<sizeof(Item)<<std::endl;??
  • ??
  • //設置日志??
  • ACE_LOG_MSG->open?(argv[0],ACE_Log_Msg::OSTREAM);??
  • //std::ofstream?myostream?(SERVER_LOG_FILE_NAME,?ios::out?|?ios::trunc);??
  • //ACE_LOG_MSG->msg_ostream?(&myostream);??
  • ??
  • //讀入配置文件??
  • ACE_DEBUG((LM_DEBUG,"read?config?file?from?%s/n",SERVER_CONFIG_FILE));??
  • XMLConfig::get_instance().read_config_from_xml_file(SERVER_CONFIG_FILE);??
  • ??
  • ??
  • //初始化MSGBLOCK池??
  • ACE_DEBUG((LM_DEBUG,"starting?init?MsgBlockManager.../n"));??
  • MsgBlockManager::get_instance().init(XMLConfig::get_instance().get_num_of_msg_blocks(),XMLConfig::get_instance().get_size_of_write_msg_blocks());??
  • ??
  • //初始化連接池??
  • ACE_DEBUG((LM_DEBUG,"starting?init?ClientManager.../n"));??
  • ClientManager::get_instance().init(XMLConfig::get_instance().get_num_of_clients(),XMLConfig::get_instance().get_size_of_client_read_buf());??
  • ??
  • ???
  • ??
  • ??
  • //開始服務器??
  • ??
  • ACE_DEBUG((LM_DEBUG,"starting?init?MyServer.../n"));??
  • MyServer::get_instance().init(XMLConfig::get_instance().get_listen_addr(),XMLConfig::get_instance().get_num_of_proactor_threads());??
  • ??
  • ACE_DEBUG((LM_DEBUG,"starting?MyServer.../n"));??
  • MyServer::get_instance().start();??
  • ACE_DEBUG((LM_DEBUG,"Enter?looping.../n"));??
  • ??
  • while(true)??
  • {??
  • ?std::cout?<<?"Input?Command=>/n"?<<?std::flush?;??
  • ?std::string?inputcmd;??
  • ?std::getline(std::cin,inputcmd);??
  • ?if(!handle_console_cmd(inputcmd))??
  • ??break;??
  • }??
  • ??
  • //char?c?;??
  • //std::cout?<<?"Press?any?key?to?stop?and?exit=>/n"?<<?std::flush?;??
  • //std::cin.clear?();??
  • //std::cin?>>?c?;??
  • MyServer::get_instance().stop();??
  • MsgBlockManager::get_instance().fini();??
  • ClientManager::get_instance().fini();??
  • return?1;??
  • }??

  • 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

    總結

    以上是生活随笔為你收集整理的ACE proactor example的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。