muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool
muduo是支持多線程的網絡庫,在muduo網絡庫學習(七)用于創建服務器的類TcpServer中也提及了TcpServer中有一個事件驅動循環線程池,線程池中存在大量線程,每個線程運行一個EventLoop::loop。
線程池的作用體現在
- 用戶啟動TcpServer服務器時創建大量子線程,每個子線程創建一個EventLoop并開始執行EventLoop::loop
- 主線程的線程池保存著創建的這些線程和EventLoop
- 當Acceptor接收到客戶端的連接請求后返回TcpServer,TcpServer創建TcpConnection用于管理tcp連接
- TcpServer從事件驅動線程池中取出一個EventLoop,并將這個EventLoop傳給TcpConnection的構造函數
- TcpConnection創建用于管理套接字的Channel并注冊到從線程池中取出的EventLoop的Poller中
- 服務器繼續監聽
這個池子既是一個線程池,又是一個EventLoop池,二者是等價的,一個EventLoop對應一個線程
這種方式稱為one loop per thread即reactor + 線程池
線程池的定義比較簡單,唯一復雜的地方是由主線程創建子線程,子線程創建EventLoop并執行EventLoop::loop,主線程返回創建的EventLoop給線程池并保存起來,比較繞。
線程池EventLoopThreadPool定義如下
成員變量和函數沒什么特別的,其中
- baseLoop_是主線程所在的事件驅動循環,即TcpServer所在的那個主線程,這個事件驅動循環通常只負責監聽客戶端連接請求,即Acceptor的Channel。
- 兩個vector保存著所有子線程即每個子線程對應的EventLoop。事件驅動循環線程被封裝在EventLoopThread中,EventLoopThread中使用的Thread才是真正的線程封裝
線程池是由TcpServer啟動的,在TcpServer::start函數中(由用戶調用)
/* * 開啟事件驅動循環線程池,將Acceptor的Channel添加到EventLoop中,注冊到Poller上* 此時還沒有調用EventLoop::loop(),所以還沒有開啟監聽*/ void TcpServer::start() {if (started_.getAndSet(1) == 0){/* 啟動線程池 */threadPool_->start(threadInitCallback_);assert(!acceptor_->listenning());/* * Acceptor和TcpServer在同一個線程,通常會直接調用 * std::bind只能值綁定,如果傳入智能指針會增加引用計數,這里傳遞普通指針* 因為TcpServer沒有銷毀,所以不用擔心Accepor會銷毀*/loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));} }TcpServer::start中調用threadPool_->start()用于啟動線程池,傳入的參數是創建好所有線程后調用的回調函數,也是由用戶提供
void EventLoopThreadPool::start(const ThreadInitCallback& cb) {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 創建一定數量的線程(事件驅動循環) */for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驅動循環線程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 創建新線程,返回新線程的事件驅動循環EventLoop */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);} }創建線程的過程有些繞,在這里梳理一下
Thread類通過調用pthread_create創建子線程
/* EventLoopThread::startLoop函數中調用,用于創建子線程 */ void Thread::start() {assert(!started_);started_ = true;// FIXME: move(func_)/* 創建子線程為線程函數提供的參數,封裝起來就可以實現傳遞多個參數 */detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);/* 創建線程,子線程調用detail::startThread,主線程繼續向下執行 */if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)){started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{/* 如果線程創建成功,主線程阻塞在這里 */latch_.wait();assert(tid_ > 0);} }創建成功后,主線程阻塞在latch_.wait()上(條件變量),等待子線程執行threadFunc之前被喚醒
這里不太明白原因,主線程為什么需要阻塞在這里
ThreadData是線程數據類,將線程函數用到的所有參數都存在這里面即可,線程函數為detail::startThread,進而調用runInThread
runInLoop調用latch_->countDown,此時會喚醒主線程,主線程回到startLoop中由于loop_為null阻塞在wait上。而此時子線程正在準備調用threadFunc(在EventLoopThread創建之初將EventLoopThread::threadFunc傳遞給Thread,用于在創建完線程后調用)
/* * 創建線程后間接調用的函數,用于執行EventLoopThread::threadFunc* 這個函數在EventLoopThread構造時傳給Thread對象的* EventLoopThread::startLoop函數中調用Thread對象的Thread::start函數* Thread::start中創建子線程,子線程調用detail::startThread,進而調用detail::runInThread* detail::runInLoop調用EventLoopThread::threadFunc,創建EventLoop,喚醒主線程,子線程執行loop循環* 轉了一大圈又回到EventLoopThread中*/void runInThread(){*tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown();latch_ = NULL;muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();/* 給當前線程命名 */::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);try{/* EventLoopThread::threadFunc() */func_();muduo::CurrentThread::t_threadName = "finished";}catch (const Exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());fprintf(stderr, "stack trace: %s\n", ex.stackTrace());abort();}catch (const std::exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());abort();}catch (...){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());throw; // rethrow}}兜兜轉轉又回到了EventLoopThread,此時主線程阻塞在EventLoopThread::startInLoop的wait上,子線程在EventLoopThread::threadFunc中,準備創建一個EventLoop然后喚醒主線程,并開啟事件循環
/* * 傳遞給線程的回調函數,當創建線程后,在detail::runInLoop會回調這個函數* 此函數創建一個事件驅動循環,并開啟事件監聽(loop)*/ void EventLoopThread::threadFunc() {/* 子線程創建事件驅動循環 */EventLoop loop;if (callback_){callback_(&loop);}{/* 上鎖后賦值給loop_ */MutexLockGuard lock(mutex_);loop_ = &loop;/* * pthread_cond_signal(pthread_cond_t&)喚醒一個wait的線程* 此時主線程發現loop_已經不為null,隨后返回到EventLoopThreadPool中*/cond_.notify();}/* 子線程開啟事件監聽,進入無限循環,不返回 */loop.loop();//assert(exiting_);loop_ = NULL; }子線程將一直停留在loop.loop()上,主線程由于被子線程喚醒,發現loop_已經不為null,說明已經創建了一個線程,同時也在那個線程中創建了一個事件驅動循環,所以主線程返回,將創建好的事件驅動循環返回給線程池保存起來,當有新的TcpConnection被創建后取出一個用來監聽tcp連接
void EventLoopThreadPool::start(const ThreadInitCallback& cb) {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 創建一定數量的線程(事件驅動循環) */for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驅動循環線程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 創建新線程,返回新線程的事件驅動循環EventLoop *//* EventLoopThread主線程返回后,將事件驅動循環保存下來,然后繼續創建線程 */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);} }至此線程池的創建工作完成,每一個線程都運行著EventLoop::loop,進行EventLoop::loop -> Poller::poll -> Channel::handleEvent -> TcpConnection::handle* -> EventLoop::doPendingFunctors -> EventLoop::loop的工作。
如果提供了回調函數,在創建完成后也會執行,但通常用戶不會在意線程池的創建工作,所以一般都不提供
創建完成后,線程池唯一的工作就是在新建TcpConnection時從池子中取出一個EventLoop傳給TcpConnection
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();/* 從事件驅動線程池中取出一個線程給TcpConnection */EventLoop* ioLoop = threadPool_->getNextLoop();/* 為TcpConnection生成獨一無二的名字 */char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();/* * 根據sockfd獲取tcp連接在本地的<地址,端口>* getsockname(int fd, struct sockaddr*, int *size);*/InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary/* 創建一個新的TcpConnection代表一個Tcp連接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));... }EventLoopThreadPool::getNextLoop函數如下,用于取出一個EventLoop。
如果線程池中沒有線程,就返回主線程的EventLoop,此時只有一個EventLoop在運行,即TcpServer的那個
總結
以上是生活随笔為你收集整理的muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode----位运算实
- 下一篇: 每天一道LeetCode-----给定字