日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool

發布時間:2024/4/19 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

muduo是支持多線程的網絡庫,在muduo網絡庫學習(七)用于創建服務器的類TcpServer中也提及了TcpServer中有一個事件驅動循環線程池,線程池中存在大量線程,每個線程運行一個EventLoop::loop。

線程池的作用體現在

  • 用戶啟動TcpServer服務器時創建大量子線程,每個子線程創建一個EventLoop并開始執行EventLoop::loop
  • 主線程的線程池保存著創建的這些線程和EventLoop
  • 當Acceptor接收到客戶端的連接請求后返回TcpServer,TcpServer創建TcpConnection用于管理tcp連接
  • TcpServer從事件驅動線程池中取出一個EventLoop,并將這個EventLoop傳給TcpConnection的構造函數
  • TcpConnection創建用于管理套接字的Channel并注冊到從線程池中取出的EventLoop的Poller中
  • 服務器繼續監聽

這個池子既是一個線程池,又是一個EventLoop池,二者是等價的,一個EventLoop對應一個線程
這種方式稱為one loop per thread即reactor + 線程池


線程池的定義比較簡單,唯一復雜的地方是由主線程創建子線程,子線程創建EventLoop并執行EventLoop::loop,主線程返回創建的EventLoop給線程池并保存起來,比較繞。
線程池EventLoopThreadPool定義如下

class EventLoop; class EventLoopThread;class EventLoopThreadPool : noncopyable {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }/* 開啟線程池,創建線程 */void start(const ThreadInitCallback& cb = ThreadInitCallback());// valid after calling start()/// round-robin/* 獲取一個線程(事件驅動循環),通常在創建TcpConnection時調用 */EventLoop* getNextLoop();/// with the same hash code, it will always return the same EventLoopEventLoop* getLoopForHash(size_t hashCode);std::vector<EventLoop*> getAllLoops();bool started() const{ return started_; }const string& name() const{ return name_; }private:/* 主線程的事件驅動循環,TcpServer所在的事件驅動循環,創建TcpServer傳入的EventLoop */EventLoop* baseLoop_;string name_;bool started_;/* 線程數 */int numThreads_;/* 標記下次應該取出哪個線程,采用round_robin */int next_;/* 線程池中所有的線程 */std::vector<std::unique_ptr<EventLoopThread>> threads_;/* * 線程池中每個線程對應的事件驅動循環,從線程池取出線程實際上返回的是事件驅動循環* 每個事件驅動循環運行在一個線程中*/std::vector<EventLoop*> loops_; };

成員變量和函數沒什么特別的,其中

  • baseLoop_是主線程所在的事件驅動循環,即TcpServer所在的那個主線程,這個事件驅動循環通常只負責監聽客戶端連接請求,即Acceptor的Channel。
  • 兩個vector保存著所有子線程即每個子線程對應的EventLoop。事件驅動循環線程被封裝在EventLoopThread中,EventLoopThread中使用的Thread才是真正的線程封裝

線程池是由TcpServer啟動的,在TcpServer::start函數中(由用戶調用)

/* * 開啟事件驅動循環線程池,將Acceptor的Channel添加到EventLoop中,注冊到Poller上* 此時還沒有調用EventLoop::loop(),所以還沒有開啟監聽*/ void TcpServer::start() {if (started_.getAndSet(1) == 0){/* 啟動線程池 */threadPool_->start(threadInitCallback_);assert(!acceptor_->listenning());/* * Acceptor和TcpServer在同一個線程,通常會直接調用 * std::bind只能值綁定,如果傳入智能指針會增加引用計數,這里傳遞普通指針* 因為TcpServer沒有銷毀,所以不用擔心Accepor會銷毀*/loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));} }

TcpServer::start中調用threadPool_->start()用于啟動線程池,傳入的參數是創建好所有線程后調用的回調函數,也是由用戶提供

void EventLoopThreadPool::start(const ThreadInitCallback& cb) {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 創建一定數量的線程(事件驅動循環) */for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驅動循環線程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 創建新線程,返回新線程的事件驅動循環EventLoop */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);} }

創建線程的過程有些繞,在這里梳理一下

  • 線程池所在線程是主線程,所有通過pthread_create創建的線程為子線程
  • 創建線程首先構造EventLoopThread對象,這里保存著一個事件驅動循環loop_和線程Thread,但是事件驅動循環初始為null,Thread初始時設置了回調函數,在創建完線程后執行
  • 執行EventLoopThread::startLoop函數啟動Thread創建子線程,主線程返回阻塞在loop_上因為它為null,子線程執行回調函數創建EventLoop并賦值給loops_,同時喚醒主線程
  • 主線程返回loops_,子線程執行EventLoop::loop開始監聽事件
  • /* * 線程池所在線程在每創建一個EventLoopThread后會調用相應對象的startLoop函數,注意主線程和子線程之分* 主線程是TcpServer所在線程,也是線程池所在線程* 子線程是由線程池通過pthread_create創建的線程,每一個子線程運行一個EventLoop::loop * * 1.主線程EventLoopThreadPool創建EventLoopThread對象* 2.主線程EventLoopThread構造函數中初始化線程類Thread并傳遞回調函數EventLoopThread::threadFunc* 3.主線程EventLoopThreadPool創建完EventLoopThread后,調用EventLoopThread::startLoop函數* 4.主線程EventLoopThread::startLoop函數開啟線程類Thread,即調用Thread::start* 5.主線程Thread::start函數中使用pthread_create創建線程后* 子線程調用回調函數EventLoopThread::threadFunc,主線程返回到EventLoopThread::startLoop* 6.主線程EventLoopThread::startLoop由于當前事件驅動循環loop_為null(構造時初始化為null)導致wait* 7.子線程EventLoopThread::threadFunc創建EventLoop并賦值給loop_,然后喚醒阻塞在cond上的主線程* 8.主線程EventLoopThread::startLoop被喚醒后,返回loop_給EventLoopThreadPool* 9.主線程EventLoopThreadPool保存返回的loop_,存放在成員變量std::vector<EventLoop*> loops_中* 10.子線程仍然在threadFunc中,調用EventLoop::loop函數,無限循環監聽*/EventLoop* EventLoopThread::startLoop() {assert(!thread_.started());/* 主線程調用線程類的start函數,創建線程 */thread_.start();{/* 加鎖,原因是loop_可能會被子線程更改 */MutexLockGuard lock(mutex_);/** 如果loop_仍然為null,說明子線程那邊還沒有進入threadFunc創建EventLoop,wait等待* pthread_cond_wait(pthread_cond_t&, pthread_mutex_t&);會自動解鎖,然后睡眠* 等待某個線程使用pthread_cond_signal(&pthread_cond_t&);或pthread_cond_boardcast(pthread_cond_t&)* 喚醒wait的一個/全部線程* 當主線程從wait中返回后,子線程已經創建了EventLoop,主線程返回到EventLoopThreadPool中* 子線程執行EventLoop::loop函數監聽事件*/while (loop_ == NULL){cond_.wait();}}return loop_; }

    Thread類通過調用pthread_create創建子線程

    /* EventLoopThread::startLoop函數中調用,用于創建子線程 */ void Thread::start() {assert(!started_);started_ = true;// FIXME: move(func_)/* 創建子線程為線程函數提供的參數,封裝起來就可以實現傳遞多個參數 */detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);/* 創建線程,子線程調用detail::startThread,主線程繼續向下執行 */if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)){started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{/* 如果線程創建成功,主線程阻塞在這里 */latch_.wait();assert(tid_ > 0);} }

    創建成功后,主線程阻塞在latch_.wait()上(條件變量),等待子線程執行threadFunc之前被喚醒
    這里不太明白原因,主線程為什么需要阻塞在這里
    ThreadData是線程數據類,將線程函數用到的所有參數都存在這里面即可,線程函數為detail::startThread,進而調用runInThread

    /* 創建線程后調用的函數,data是參數 */ void* startThread(void* obj) {ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL; }

    runInLoop調用latch_->countDown,此時會喚醒主線程,主線程回到startLoop中由于loop_為null阻塞在wait上。而此時子線程正在準備調用threadFunc(在EventLoopThread創建之初將EventLoopThread::threadFunc傳遞給Thread,用于在創建完線程后調用)

    /* * 創建線程后間接調用的函數,用于執行EventLoopThread::threadFunc* 這個函數在EventLoopThread構造時傳給Thread對象的* EventLoopThread::startLoop函數中調用Thread對象的Thread::start函數* Thread::start中創建子線程,子線程調用detail::startThread,進而調用detail::runInThread* detail::runInLoop調用EventLoopThread::threadFunc,創建EventLoop,喚醒主線程,子線程執行loop循環* 轉了一大圈又回到EventLoopThread中*/void runInThread(){*tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown();latch_ = NULL;muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();/* 給當前線程命名 */::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);try{/* EventLoopThread::threadFunc() */func_();muduo::CurrentThread::t_threadName = "finished";}catch (const Exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());fprintf(stderr, "stack trace: %s\n", ex.stackTrace());abort();}catch (const std::exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());abort();}catch (...){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());throw; // rethrow}}

    兜兜轉轉又回到了EventLoopThread,此時主線程阻塞在EventLoopThread::startInLoop的wait上,子線程在EventLoopThread::threadFunc中,準備創建一個EventLoop然后喚醒主線程,并開啟事件循環

    /* * 傳遞給線程的回調函數,當創建線程后,在detail::runInLoop會回調這個函數* 此函數創建一個事件驅動循環,并開啟事件監聽(loop)*/ void EventLoopThread::threadFunc() {/* 子線程創建事件驅動循環 */EventLoop loop;if (callback_){callback_(&loop);}{/* 上鎖后賦值給loop_ */MutexLockGuard lock(mutex_);loop_ = &loop;/* * pthread_cond_signal(pthread_cond_t&)喚醒一個wait的線程* 此時主線程發現loop_已經不為null,隨后返回到EventLoopThreadPool中*/cond_.notify();}/* 子線程開啟事件監聽,進入無限循環,不返回 */loop.loop();//assert(exiting_);loop_ = NULL; }

    子線程將一直停留在loop.loop()上,主線程由于被子線程喚醒,發現loop_已經不為null,說明已經創建了一個線程,同時也在那個線程中創建了一個事件驅動循環,所以主線程返回,將創建好的事件驅動循環返回給線程池保存起來,當有新的TcpConnection被創建后取出一個用來監聽tcp連接

    void EventLoopThreadPool::start(const ThreadInitCallback& cb) {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 創建一定數量的線程(事件驅動循環) */for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驅動循環線程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 創建新線程,返回新線程的事件驅動循環EventLoop *//* EventLoopThread主線程返回后,將事件驅動循環保存下來,然后繼續創建線程 */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);} }

    至此線程池的創建工作完成,每一個線程都運行著EventLoop::loop,進行EventLoop::loop -> Poller::poll -> Channel::handleEvent -> TcpConnection::handle* -> EventLoop::doPendingFunctors -> EventLoop::loop的工作。
    如果提供了回調函數,在創建完成后也會執行,但通常用戶不會在意線程池的創建工作,所以一般都不提供


    創建完成后,線程池唯一的工作就是在新建TcpConnection時從池子中取出一個EventLoop傳給TcpConnection

    void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();/* 從事件驅動線程池中取出一個線程給TcpConnection */EventLoop* ioLoop = threadPool_->getNextLoop();/* 為TcpConnection生成獨一無二的名字 */char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();/* * 根據sockfd獲取tcp連接在本地的<地址,端口>* getsockname(int fd, struct sockaddr*, int *size);*/InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary/* 創建一個新的TcpConnection代表一個Tcp連接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));... }

    EventLoopThreadPool::getNextLoop函數如下,用于取出一個EventLoop。
    如果線程池中沒有線程,就返回主線程的EventLoop,此時只有一個EventLoop在運行,即TcpServer的那個

    /* 從線程池中取出一個線程,挨著取 */ EventLoop* EventLoopThreadPool::getNextLoop() {baseLoop_->assertInLoopThread();assert(started_);/* 線程池所在線程,TcpServer的主線程 */EventLoop* loop = baseLoop_;/* * 如果不為空,取出一個* 如果為空,說明線程池中沒有創建線程,是單線程程序,返回主線程*/if (!loops_.empty()){// round-robin/* loops_保存所有的線程 */loop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()){next_ = 0;}}return loop; }

    總結

    以上是生活随笔為你收集整理的muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。