muduo网络库学习(四)事件驱动循环EventLoop
muduo的設計采用高并發服務器框架中的one loop per thread模式,即一個線程一個事件循環。
這里的loop,其實就是muduo中的EventLoop,所以到目前為止,不管是Poller,Channel還是TimerQueue都僅僅是單線程下的任務,因為這些都依賴于EventLoop。這每一個EventLoop,其實也就是一個Reactor模型。
而多線程體現在EventLoop的上層,即在EventLoop上層有一個線程池,線程池中每一個線程運行一個EventLoop,也就是Reactor + 線程池的設計模式
梳理一下
- 每個muduo網絡庫有一個事件驅動循環線程池EventLoopThreadPool
- 每個線程池中有多個事件驅動線程EventLoopThread
- 每個線程運行一個EventLoop事件循環
- 每個EventLoop事件循環包含一個io復用Poller,一個計時器隊列TimerQueue
- 每個Poller監聽多個Channel,TimerQueue其實也是一個Channel
- 每個Channel對應一個fd,在Channel被激活后調用回調函數
- 每個回調函數是在EventLoop所在線程執行
- 所有激活的Channel回調結束后EventLoop繼續讓Poller監聽
所以調用回調函數的過程中是同步的,如果回調函數執行時間很長,那么這個EventLoop所在線程就會等待很久之后才會再次調用poll。
整個muduo網絡庫實際上是由Reactor + 線程池實現的,線程池中每一個線程都是一個Reactor模型。在處理大并發的服務器任務上有很大優勢。
簡化的關系圖如下,EventLoop只涉及Poller,Channel(簡單涉及TcpConnection)和TimerQueue。
- 白色三角,繼承
- 黑色菱形,聚合
一個事件驅動循環EventLoop其實就是一個Reactor模型,是一個單線程任務。主要包含io復用函數Poller,定時器隊列TimerQueue以及激活隊列。其他的就是一些輔助變量
typedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;/* 創建時保存當前事件循環所在線程,用于之后運行時判斷使用EventLoop的線程是否是EventLoop所屬的線程 */const pid_t threadId_;/* poll返回的時間,用于計算從激活到調用回調函數的延遲 */Timestamp pollReturnTime_;/* io多路復用 */std::unique_ptr<Poller> poller_;/* 定時器隊列 */std::unique_ptr<TimerQueue> timerQueue_;/* 喚醒當前線程的描述符 */int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client./* * 用于喚醒當前線程,因為當前線程主要阻塞在poll函數上* 所以喚醒的方法就是手動激活這個wakeupChannel_,即寫入幾個字節讓Channel變為可讀* 注: 這個Channel也注冊到Poller中*/std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variables/* * 激活隊列,poll函數在返回前將所有激活的Channel添加到激活隊列中* 在當前事件循環中的所有Channel在Poller中*/ChannelList activeChannels_;/* 當前執行回調函數的Channel */Channel* currentActiveChannel_;/* * queueInLoop添加函數時給pendingFunctors_上鎖,防止多個線程同時添加* * mutable,突破const限制,在被const聲明的函數仍然可以更改這個變量*/mutable MutexLock mutex_;/* * 等待在當前線程調用的回調函數,* 原因是本來屬于當前線程的回調函數會被其他線程調用時,應該把這個回調函數添加到它屬于的線程中* 等待它屬于的線程被喚醒后調用,以滿足線程安全性* * TcpServer::removeConnection是個例子* 當關閉一個TcpConnection時,需要調用TcpServer::removeConnection,但是這個函數屬于TcpServer,* 然而TcpServer和TcpConnection不屬于同一個線程,這就容易將TcpServer暴露給其他線程,* 萬一其他線程析構了TcpServer怎么辦(線程不安全)* 所以會調用EventLoop::runInLoop,如果要調用的函數屬于當前線程,直接調用* 否則,就添加到這個隊列中,等待當前線程被喚醒*/std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_最后一個變量std::vector<Functor> pendingFunctors_;比較不好理解,它是一個任務容器,存放的是將要執行的回調函數。
準備這么一個容器的原因在于
- 某個對象(通常是Channel或者TcpConnection)可能被另一個線程使用(這個線程不是這個對象所在線程),此時這個對象就等于暴露給其他線程了。這是非常不安全的,萬一這個線程不小心析構了這個對象,而這個對象所屬的那個線程正要訪問這個對象(例如調用這個對象的接口),這個線程就會崩潰,因為它訪問了一個本不存在的對象(已經被析構)。
- 為了解決這個問題,就需要盡量將對這個對象的操作移到它所屬的那個線程執行(這里是調用這個對象的接口)以滿足線程安全性。又因為每個對象都有它所屬的事件驅動循環EventLoop,這個EventLoop通常阻塞在poll上??梢员WC的是EventLoop阻塞的線程就是它所屬的那個線程,所以調用poll的線程就是這個對象所屬的線程。這就 可以讓poll返回后再執行想要調用的函數,但是需要手動喚醒poll,否則一直阻塞在那里會耽誤函數的執行。
runInLoop和queueInLoop函數執行的就是上述操作
/** 1.如果事件循環不屬于當前這個線程,就不能直接調用回調函數,應該回到自己所在線程調用* 2.此時需要先添加到自己的隊列中存起來,然后喚醒自己所在線程的io復用函數(poll)* 3.喚醒方法是采用eventfd,這個eventfd只有8字節的緩沖區,向eventfd中寫入數據另poll返回* 4.返回后會調用在隊列中的函數,見EventLoop* * 舉例說明什么時候會出現事件驅動循環不屬于當前線程的情況* 1.客戶端close連接,服務器端某個Channel被激活,原因為EPOLLHUP* 2.Channel調用回調函數,即TcpConnection的handleClose* 3.handleClose調用TcpServer為它提供的回調函數removeConnection* 4.此時執行的是TcpServer的removeConnection函數,* 解釋 * 1.因為TcpServer所在線程和TcpConnection所在的不是同一個線程* 2.這就導致將TcpServer暴露給了TcpConnection所在線程* 3.因為TcpServer需要將這個關閉的TcpConnection從tcp map中刪除* 就需要調用自己的另一個函數removeConnectionInLoop* 4.為了實現線程安全性,也就是為了讓removeConnectionInLoop在TcpServer自己所在線程執行* 需要先把這個函數添加到隊列中存起來,等到回到自己的線程在執行* 5.runInLoop中的queueInLoop就是將這個函數存起來* 6.而此時調用runInLoop的仍然是TcpConnection所在線程* 7.因為自始至終,removeConnection這個函數都還沒有結束* * 如果調用runInLoop所在線程和事件驅動循環線程是一個線程,那么直接調用回調函數就行了* * 在TcpServer所在線程中,EventLoop明明阻塞在poll上,這里為什么可以對它進行修改* 1.線程相當于一個人可以同時做兩件事情,一個EventLoop同時調用兩個函數就很正常了* 2.其實函數調用都是通過函數地址調用的,既然EventLoop可讀,就一定直到內部函數的地址,自然可以調用* 3.而更改成員函數,通過地址訪問,進而修改,也是可以的*/ void EventLoop::runInLoop(Functor cb) {if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));} }當然了,如果這個對象所屬線程和當前線程相同,就沒有線程安全性的問題,直接調用即可。否則,就需要添加到pendingFunctors_中,這正是queueInLoop的功效
/** 由runInLoop調用,也可直接調用,作用* 1.將相應的回調函數存在事件驅動循環的隊列中,等待回到自己線程再調用它* 2.激活自己線程的事件驅動循環*/ void EventLoop::queueInLoop(Functor cb) {{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();} }此處需要上鎖保護pendingFunctors_以防止多個線程同時向它添加函數。這里的鎖體現了RAII方法,大括號是語句塊,把里面的變量作為臨時變量處理
因為EventLoop通常阻塞在poll上,所以添加到pendingFunctors_后需要手動喚醒它,不然它一直阻塞在poll,會耽誤了函數的執行。喚醒的方法是使用eventfd
函數用于創建一個eventfd文件描述符,這個描述符可用于進程/線程間的等待/喚醒。原因是內核只為eventfd維護一個uint64_t類型的計數器,大小應該在64位。
參數initval是這個計數器的初值
flags是一些標志,可以是下面幾個的或運算結果
- EFD_NONBLOCK,非阻塞
- EFD_CLOEXEC,設置close-on-exec屬性,調用exec時會自動close
- …
eventfd也可以使用write/read等io函數進行讀寫,區別是
write每次只能寫入8字節大小的數據,內核會將這8字節大小的數值加到計數器上
read一次性讀取這個計數器的值,并把緩沖區初始化為0。如果調用read時這個計數器值就是0,那么非阻塞時會返回EAGAIN,阻塞時會等待計數器的值變為非0
可以把這個eventfd添加到poll中,在需要喚醒時寫入8字節數據,此時poll返回,執行回調函數,然后執行在pendingFunctors_中的函數。
loop函數是EventLoop的事件驅動循環,所有的Reactor模型的loop函數都差不多。執行的就是poll和回調函數的回調,以及pendingFunctors_中函數的調用
/* * 事件驅動主循環* * 1.每個TcpServer對應一個事件驅動循環線程池* 2.每個事件驅動循環線程池對應多個事件驅動循環線程* 3.每個事件驅動循環線程對應一個事件驅動主循環* 4.每個事件驅動主循環對應一個io多路復用函數* 5.每個io多路復用函數監聽多個Channel* 6.每個Channel對應一個fd,也就對應一個TcpConnection或者監聽套接字* 7.在poll返回后處理激活隊列中Channel的過程是同步的,也就是一個一個調用回調函數* 8.調用回調函數的線程和事件驅動主循環所在線程是同一個,也就是同步執行回調函數* 9.線程池用在事件驅動循環上層,也就是事件驅動循環是線程池中的一個線程*/ void EventLoop::loop() {assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false; // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){/* 清空激活隊列 */activeChannels_.clear();/* epoll_wait返回后會將所有就緒的Channel添加到激活隊列activeChannel中 */pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priorityeventHandling_ = true;/* 執行所有在激活隊列中的Channel的回調函數 */for (Channel* channel : activeChannels_){currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_);}currentActiveChannel_ = NULL;eventHandling_ = false;/* 執行pendingFunctors_中的所有函數 */doPendingFunctors();}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false; }Reactor模式的loop函數大多一個樣子
muduo中多了處理pendingFunctors_中的函數,在自己的線程調用自己的函數是安全的
Channel的回調函數就是根據被激活原因調用不同的回調函數,這些回調函數是在TcpConnection創建之初被設置的。
簡單說一下Channel和TcpConnection的關系
- 每個TcpConnection對象代表一個tcp連接,所以TcpConnection中需要保存用于服務器/客戶端通信的套接字,這個套接字就記錄在Channel中
- TcpConnection在創建之初會為Channel設置回調函數,如果套接字可讀/可寫/錯誤/關閉等就會執行TcpConnection中的函數
- TcpConnection在確定連接已經建立后會向Poller注冊自己的Channel
Channel的handleEvent如下
tie_是TcpConnection的弱引用,在調用TcpConnection的函數之前判斷它是否還存在,如果被析構了,那么提升的shared_ptr會是null
具體可以參考 muduo網絡庫學習(二)對套接字和監聽事件的封裝Channel
EventLoop沒有特別處理定時器任務,原因是定時器任務TimerQueue也被轉換成一個文件描述符添加到Poller中,所以時間一到timerfd變為可讀,poll就會返回,就會調用回調函數。EventLoop只提供了runAt/runAfter/runEveny三個接口用于設置定時任務。這些在 muduo網絡庫學習(三)定時器TimerQueue的設計中有提及
/* * 定時器功能,由用戶調用runAt并提供當事件到了執行的回調函數* 時間在Timestamp設置,絕對時間,單位是微秒*/ TimerId EventLoop::runAt(Timestamp time, TimerCallback cb) {/* std::move,移動語義,避免拷貝 */return timerQueue_->addTimer(std::move(cb), time, 0.0); }/** 如上,單位是微秒,相對時間*/ TimerId EventLoop::runAfter(double delay, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, std::move(cb)); }/** 每隔多少微秒調用一次*/ TimerId EventLoop::runEvery(double interval, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval); }幾個C++方面的知識點
- std::move,移動語義,避免拷貝
- RAII,以鎖為例,構造時上鎖,析構時解鎖(函數返回時局部變量析構)
- 花括號語句塊
- std::unique_ptr,智能指針,不允許拷貝和賦值,獨一無二
- std::shared_ptr,智能指針,可以拷貝賦值,存在引用計數
- std::weak_ptr,弱引用,不增加引用計數,必要時可通過lock函數提升為shared_ptr
總結
以上是生活随笔為你收集整理的muduo网络库学习(四)事件驱动循环EventLoop的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: muduo网络库学习(三)定时器Time
- 下一篇: 每天一道LeetCode-----合并两