日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

ACE_Reactor(二)ACE_Dev_Poll_Reactor

發布時間:2025/3/21 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ACE_Reactor(二)ACE_Dev_Poll_Reactor 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ACE_Reactor一些重要的細節
看下具體ACE_Dev_Poll_Reactor的實現,如何將一個處理集和handle關聯起來,代碼如下:

int ACE_Dev_Poll_Reactor ::register_handler_i(handle, event_handler,mask) //step 1 if(this->handler_rep_.find(handle)==0) {//Handler not present in the repository.Bind it.if(this->handler_rep_.bind(handle,event_handler, mask)!=0)return -1; }//step2 //All but the notify handler get registered with oneshot to facilitate auto suspend before the upcall.See dispatch_io_event for more information if(event_handler!= this->notify_handler_)epev.events |= EPOLLONESHOT; if(::epoll_ctl(this->poll_fd_, op, handle, &epev) == -1)......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

step1
首先就是要將handle和處理這個handle的event_handler綁定,若已有處理的event_handler,則只單純將可能新增的mask添加進監測的epoll函數參數中。
這里的handler_rep_是一個Handler_Repository類的指針,其內部是一個map表可以對handle和event_handler進行有效關聯對應和存儲。
step2
從上面可以看出,如果是epoll的話,且注冊了notify handler, 則epoll_wait會自動將EPOLLONESHOT標識添加上,這樣在事件被監測到得分發處理時就不用再手動的去suspend。

Notify機制的好處是:
1. 讓反應器擁有了處理無限處理器的能力
2. 其次是提供了必要時解除反應器事件檢查阻塞的能力。

Reactor的notify()讓用戶直接提供給Reactor待通知反應器的指針,而這些處理器無需注冊到反應器上,從而提供了無限的擴展能力。
不過在ACE中顯然Notify機制有不同的實現方式。一種是采用ACE_Pipe的實現,這個屬于默認的方式。另一種則是內存隊列保存Notify消息。你可以通過定義ACE_HAS_REACTOR_NOTIFICATION_QUEUE的宏編譯ACE,這樣ACE將不使用ACE_Pipe作為Notify消息的管道,而使用一個自己的內存隊列保存Notify消息。
但是需要注意的是,在使用ACE_Pipe的實現中,如果使用不當,可能會造成嚴重的后果。
如果你用不到Notify機制,最好在ACE_Reactor初始化的時候徹底關閉Notify機制。很多Reactor的初始化函數都提供了關閉notify pipe的方式。比如ACE_Select_Reactor_T的open函數的disable_notify_pipe參數。當其為1的時候表示關閉notify 管道。

潛在的風險:
1. 處理器被銷毀后,排隊等候的對應通知才被分派
2. ACE_Pipe的數據緩沖是有限的,大量通知到來可能會造成阻塞甚至死鎖

因為通知的信息(包括處理器指針和掩碼)以流數據的方式被寫入ACE_Pipe,不能進行遍歷查找,所以當對應處理器被銷毀后,如果其在ACE_Pipe的數據區中還有存儲有通知,則ACE_Reactor將會在分派中使用懸空指針,造成未知的后果。另外,在ACE_Pipe的數據緩沖已滿情況下,在處理器的回調中依然發送通知,就會因阻塞發生死鎖。

通過

int ACE_Dev_Poll_Reactor ::notify(eh, mask,timeout) {//pass over both the event_handler* and * the mask o allow the caller to dictate which Event_Handler method the receiver invokes.Note that this call can timeout.n = this->notify_handle_->notify(eh, mask, timeout);return n=-1?-1:0; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在源碼的owner接口中,看到了這樣一句注釋
//There is no need to set the owner of the event loop.Multiple threads may invoke the event loop simulataneously.
說明有可能多個線程同時調用ACE_Dev_Poll_Reactor的event_loop函數,網上很多示例代碼寫的都比較簡單,例如:

65 int main(void)66 {67 ACE::init();68 69 ACE_Dev_Poll_Reactor dev_reactor(1024);70 ACE_Reactor reactor(&dev_reactor);71 ACE_Reactor::instance(&reactor);72 73 TestAccptor accptor;74 ACE_INET_Addr addr(10000);75 if( accptor.open(addr) == -1 ) {76 cout << "Open port failed .. " << endl;77 return -1;78 } 79 80 cout << "Open port ok .. " << endl;81 82 83 ACE_Reactor::instance()->run_reactor_event_loop();84 ACE::fini();85 return 0;86 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

這里有一個問題,監測事件觸發和同時執行的是不是就只有一個main函數的這一個主線程就完全搞定了。現在看是這樣的,但是實際中應該是不同線程都可以去調用上述的run_reactor_event_loop()來完成事件的監測和觸發才對。

上述的Notify的實現機制,在其open函數中可能可以看出點東西來:

int ACE_Dev_Poll_Reactor_Notify ::open(ACE_Reactor_Impl*r , ACE_Timer_Queue*in disable_notify_pipe) {if(0==disable_notify_pipe){this->dp_reactor=(ACE_Dev_Poll_Reactor*)r;if(this->notification_pipe_.read_handle()==-1)return -1;#if defined (F_SETFD)ACE_OS::fcntl(this->notification_pipe_.read_handle(),F_SETFD,1);ACE_OS::fcntl(this->notification_pipe_.write_handle(),F_SETFD,1); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)if(this->notification_queue_.open()==-1)return -1;if(ACE_OS::set_flags(this->notification_pipe_.write_handle(),ACE_NONBLOCK) == -1)return -1; #endifif(ACE_OS::set_flags(this->notification_pipe_.read_handle(),ACE_NONBLOCK) == -1)return -1; }return 0; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

若是不用pipe機制作為Notify通知機制實現,則這個open直接就return掉了。

再來看:

int ACE_Dev_Poll_Reactor ::dispatch_io_event(Token_Guard &guard) {{CGuard(this->repo_loc_)//對于handler的respository訪問,因此加鎖保護//step1 獲取event_handlerEvent_Tuple *info=this->handle_rep_.find(handle);if(info==0) return 0;//說明已經沒有對應的event_handler了if(info->suspended) return 0;//說明其他線程可能已經更改了這個handle的mask。short revents=(pollfd*)pfds->revents;//即已觸發的事件//step2 根據revents來指定disp_mask=WIRTE/READ/EXCPET_MASK;callback=handle_output/input/exception;//step3 將當前的event_handler標識置為suspended為true,以防止被重復觸發if(eh!=this->notify_handler_)info->suspended=true;}//step4 將notify的event_handler直接處理,分發,且注意不要suspend和resume notify的handler,因為如果要這樣做可能引起無休止的暫停和恢復,又要去經常等待獲取token。if(eh==this->notify_handler_)//說明是notify handler,直接處理{ACE_Notification_Buffer b;notify_handler_->dequeue_one(b);guard.release_token();return notify_handler_->dispatch_notify(b);//直接去分發處理notify消息}//step5 執行handler中對應的回調{ACE_Dev_Poll_Handler_Guard eh_guard(eh);guard.release_token();status=this->upcall(eh, callback,handle);}//step6 檢查回調的返回值status,是否等于0去恢復由于執行回調而暫停的handle//step7 檢查handle對應的回調的返回值,若小于0則需要remove_handler_i }而上述upcall在實際的內聯實現文件中:int ACE_Dev_Poll_Reactor ::upcall(eventhandler,callback,handle) {do {status=(event_handler->*callback)(handle);}while(status>0 && event_handler != this->notify_handler_);return status; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

上面雖然對于注冊handler和handle關聯以及最終事件觸發后如何調用handler中的回調函數理清了。但是有一點,如何完成多路復用和事件分離,并沒有很清楚。這里就要提到

int ACE_Dev_Poll_Reactor::work_pending_i ( ACE_Time_Value * max_wait_time ) [protected]01071 { 01072 ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending_i"); 01073 01074 if (this->deactivated_) 01075 return 0; 01076 01077 #if defined (ACE_HAS_EVENT_POLL) 01078 if (this->start_pevents_ != this->end_pevents_) 01079 #else 01080 if (this->start_pfds_ != this->end_pfds_) 01081 #endif /* ACE_HAS_EVENT_POLL */ 01082 return 1; // We still have work_pending(). Do not poll for 01083 // additional events. 01084 01085 ACE_Time_Value timer_buf (0); 01086 ACE_Time_Value *this_timeout = 0; 01087 01088 this_timeout = this->timer_queue_->calculate_timeout (max_wait_time, 01089 &timer_buf); 01090 01091 // Check if we have timers to fire. 01092 const int timers_pending = 01093 ((this_timeout != 0 && max_wait_time == 0) 01094 || (this_timeout != 0 && max_wait_time != 0 01095 && *this_timeout != *max_wait_time) ? 1 : 0); 01096 01097 const long timeout = 01098 (this_timeout == 0 01099 ? -1 /* Infinity */ 01100 : static_cast<long> (this_timeout->msec ())); 01101 01102 #if defined (ACE_HAS_EVENT_POLL) 01103 01104 // Wait for events. 01105 const int nfds = ::epoll_wait (this->poll_fd_, 01106 this->events_, 01107 this->size_, 01108 static_cast<int> (timeout)); 01109 01110 if (nfds > 0) 01111 { 01112 this->start_pevents_ = this->events_; 01113 this->end_pevents_ = this->start_pevents_ + nfds; 01114 } 01115 01116 #else 01117 01118 struct dvpoll dvp; 01119 01120 dvp.dp_fds = this->dp_fds_; 01121 dvp.dp_nfds = this->size_; 01122 dvp.dp_timeout = timeout; // Milliseconds 01123 01124 // Poll for events 01125 const int nfds = ACE_OS::ioctl (this->poll_fd_, DP_POLL, &dvp); 01126 01127 // Retrieve the results from the pollfd array. 01128 this->start_pfds_ = dvp.dp_fds; 01129 01130 // If nfds == 0 then end_pfds_ == start_pfds_ meaning that there is 01131 // no work pending. If nfds > 0 then there is work pending. 01132 // Otherwise an error occurred. 01133 if (nfds > -1) 01134 this->end_pfds_ = this->start_pfds_ + nfds; 01135 #endif /* ACE_HAS_EVENT_POLL */ 01136 01137 // If timers are pending, override any timeout from the poll. 01138 return (nfds == 0 && timers_pending != 0 ? 1 : nfds); 01139 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

上述代碼是5.5.2版本而在最新的6.3.3版本中這里的epoll_wait 第三個參數即MaxEvents是直接用魔數1來指定的。也即每次最多返回1個觸發的事件。
其他的線程當然也可以調用handle_events來監測和處理事件,但是在handle_events中對這些其他的線程也做了限制,即必須是token的owner才能執行,否則直接return。
而如何成為owner,則需要調用函數Token_Guard 的函數acquire_quietly,其代碼如下:

01144 { 01145 ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events"); 01146 01147 // Stash the current time 01148 // 01149 // The destructor of this object will automatically compute how much 01150 // time elapsed since this method was called. 01151 ACE_MT (ACE_Countdown_Time countdown (max_wait_time)); 01152 01153 Token_Guard guard (this->token_); 01154 int result = guard.acquire_quietly (max_wait_time); 01155 01156 // If the guard is NOT the owner just return the retval 01157 if (!guard.is_owner ()) 01158 return result; 01159 01160 if (this->deactivated_) 01161 return -1; 01162 01163 // Update the countdown to reflect time waiting for the mutex. 01164 ACE_MT (countdown.update ()); 01165 01166 return this->handle_events_i (max_wait_time, guard); 01167 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

這樣就變成了串行化的使用epoll獲取poll去監測事件,然后監測到線程去處理,而沒有成為token的owner的線程則持續等待。
參考:

http://blog.csdn.net/fullsail/article/details/2901800

總結

以上是生活随笔為你收集整理的ACE_Reactor(二)ACE_Dev_Poll_Reactor的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。