【nodejs原理源码赏析(6)】深度剖析cluster模块源码与node.js多进程(下)
目錄
- 一. 引言
- 二.server.listen方法
- 三.cluster._getServer( )方法
- 四.跨進程通訊工具方法Utils
- 五.act:queryServer消息
- 六.輪詢調度Round-Robin-Handle
- 七. 圖解集群建立過程的邏輯跳轉
示例代碼托管在:http://www.github.com/dashnowords/blogs
博客園地址:《大史住在大前端》原創博文目錄
華為云社區地址:【你要的前端打怪升級指南】
閱讀本章需要先閱讀本系列前兩章內容預熱一下。
一. 引言
前兩篇博文中已經分別介紹了使用cluster模塊建立集群時主進程執行cluster.fork( )方法時的執行邏輯,以及net模塊在不同場景下建立通訊的基本原理。本篇繼續分析cluster模塊,從第一個子進程開始建立服務器講起,cluster基本用法示例代碼再來一遍:
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length;if (cluster.isMaster) {console.log(`主進程 ${process.pid} 正在運行`);// 衍生工作進程。for (let i = 0; i < numCPUs; i++) {cluster.fork();}cluster.on('exit', (worker, code, signal) => {console.log(`工作進程 ${worker.process.pid} 已退出`);}); } else {// 工作進程可以共享任何 TCP 連接。// 在本例子中,共享的是 HTTP 服務器。http.createServer((req, res) => {res.writeHead(200);res.end('你好世界\n');}).listen(8000);console.log(`工作進程 ${process.pid} 已啟動`); }代碼是足夠精簡的,實現過程也確實是很龐大的工程。每一個子進程中執行的邏輯都是http.createServer().listen(),我們來看看它是如何一步一步運作而最終建立通訊機制的,你會發現它和上一節中的簡易模型非常相似。
二.server.listen方法
在http模塊的源碼中很容易找到http.createServer( )方法的邏輯就是透傳參數生成了一個net.Server實例,這個實例在上一節中就已經介紹過,實際上就只是生成了一個server的實例,所以這里跳轉到net.Server.prototype.listen()(net.js文件1306-1404行),基本邏輯如下:
Server.prototype.listen = function(...args){const normalized = normalizeArgs(args);var options = normalized[0];/*..獲取監聽參數中的句柄對象..*/options = options._handle || options.handle || options;//如果options上有句柄,句柄是一個TCP實例if(options instanceof TCP){//......listenInCluster(......);}//如果配置參數中有fd(file descriptor)if(typeof options.fd === 'number' && options.fd >=0){//......listenInCluster(......);}//如果參數中有port端口號if(typeof options.port === 'number' || typeof options.port === 'string'){//.....listenInCluster(......);}//如果參數中有port端口號 或 字符型的pipe名稱if(typeof options.port === 'number' || typeof options.port === 'string'){//.....listenInCluster(......);} }這里不難看出它的邏輯就和net模塊官方文檔中描述的server.listen( )的幾種場景對應,可以監聽帶有非空handle屬性的句柄對象,數字型端口號,字符串型命名管道地址,或者直接傳入配置參數合集options,然后分別根據幾種不同的情況來調用listenInCluster方法(集群功能的邏輯主線是數字型port,假設傳入了12315)。
listenInCluster方法定義如下:
大致可以看出,如果是主進程,就直接調用server._listen2()方法然后return了,否則(也就是在工作進程中的邏輯,敲黑板!!!這里是重點了),構造一個serverQuery的參數集,可以看到里面記錄了以各種不同姿勢調用這個方法時傳入的參數,所以有的參數為null也很正常,然后調用了cluster._getServer( )方法,這就是工作進程在引用cluster模塊時引入的child.js中定義并掛載在cluster上的方法,最后一個參數listenOnMasterHandle是一個回調函數,也是一個錯誤前置風格的函數,可以看到,它接收了一個句柄對象,并把這個句柄對象掛載在了子進程這個server實例的_handle屬性上,接著也調用了server._listen2( )方法,可以看到兩種情況下調用這個方法時傳入的參數是一樣的。接著來到server._listen2( )方法,它綁定了setupListenHandle方法(別抓狂,這是net模塊中相關邏輯的最后一步了),簡化代碼如下:
function setupListenHandle(......){if (this._handle) {//工作進程在執行上一步邏輯時,在cluster._getServer()回調函數中把一個handle傳遞給了server._handledebug('setupListenHandle: have a handle already');} else {//主進程會執行的邏輯debug('setupListenHandle: create a handle');//......rval = createServerHandle(address, port, addressType, fd, flags);//......this._handle = rval; }//......this._handle.onconnection = onconnection;this._handle[owner_symbol] = this;//.... }工作進程通過cluster._getServer( )方法拿到了一個handle,所以不會再生成,而主進程server.listen(port)執行時會走到else分支,然后生成一個新的綁定了端口號的特殊的socket句柄然后掛載到主進程server._handle上,這里對句柄的connection事件回調邏輯進行了修改,相關代碼如下:
這里需要注意的是,server._handle的connection事件和server的connection事件是兩碼事,server._handle指向的是一個綁定了端口的特殊的socket句柄,當客戶端connect一個server時實際上底層是客戶端socket與服務端這個socket的對接,所以需要在server._handle這個的connection回調函數中,將客戶端的socket句柄clientHandle重新包裝,然后再通過觸發server的connection事件將其轉發給server實例。所以在使用server實例時可以直接添加connectionListener:
let server = net.createServer(socket=>{/*這個回調函數就是server的connection事件回調* 這里接收到的socket就是server._handle的connection收到的客戶端句柄clientHandle封裝成的socket實例*/ })無論是主進程還是子進程都會觸發這個邏輯,只需要看成是一種功能性質的封裝即可,并不影響業務邏輯。
三.cluster._getServer( )方法
下面回到cluster模塊繼續,_getServer( )方法只存在于子進程代碼中,源碼位于lib/internal/cluster/child.js,方法定義在54-106行,基本邏輯如下:
cluster._getServer = function(obj, options, cb){/* 這里的obj就是子進程中運行上面listenInCluster方法中傳入的server,* options就是serverQuery,* cb就是最后要把主進程handle傳回去的回調函數listenOnMasterHandler *///先構建index然后進行了一通記錄,就是根據監聽的參數來構建一個識別這個server的索引//然后構建消息const message = {act: 'queryServer',index,data: null,...options};//....../* 發送act:queryServer消息,并傳一個回調函數,* 從形參命名就可以看出,這個回調函數被調用時會被傳入一個句柄,* 最后根據不同的調度策略來執行不同的函數,這里主要看Round-robin*/ send(message, (reply, handle) => {if (typeof obj._setServerData === 'function')obj._setServerData(reply.data);if (handle)shared(reply, handle, indexesKey, cb); // Shared listen socket.elserr(reply, indexesKey, cb); // Round-robin.});//...... }rr方法將響應reply和前一個調用者傳入的回調函數cb進行了透傳,rr的函數體就是實現listen方法偷梁換柱的地方了:
// Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) {if (message.errno)return cb(message.errno, null);var key = message.key;function listen(backlog) {return 0;}function close() {if (key === undefined)return;send({ act: 'close', key });handles.delete(key);indexes.delete(indexesKey);key = undefined;}function getsockname(out) {if (key)Object.assign(out, message.sockname);return 0;}const handle = { close, listen, ref: noop, unref: noop };if (message.sockname) {handle.getsockname = getsockname; // TCP handles only.}assert(handles.has(key) === false);handles.set(key, handle);cb(0, handle); //這里的cb其實就是listenInCluster方法中定義的那個listenOnMasterHandler回調 }可以看到rr方法中構建了一個假的handle句柄,并調用cb將它傳了回去,然后執行邏輯回回到net模塊,前文已經提這個handle在回調函數中被掛載在了server._handle上,于是setupListenHandle( )的邏輯中也不會重新構建句柄。
重新梳理一下這部分的邏輯,就是子進程中調用listen方法時,會通過cluster._getServer( )拿到一個假句柄,然后執行一個空的listen方法,這樣就避免了端口的重復監聽。所以我們可以推測,cluster._getServer( )必然會觸發主進程啟動一個監聽端口的服務器,并建立對子進程的調度,進程之間的IPC通訊可以直接通過process對象來完成,不需要再重新構建跨進程通訊管道。
四.跨進程通訊工具方法Utils
繼續進行后續內容前,先來看一個獨立的跨進程通訊工具,源碼放在lib/internal/cluster/utils.js。
它是cluster模塊發送跨進程消息的內部代理,這個模塊對外暴露了消息發送方法sendHelper和內部消息監聽器的預處理方法internal,源碼很短就不貼了。當子進程調用sendHelper發送消息時,utils內部會把這條消息處理完成后需要執行的回調函數先緩存起來,然后給消息添加一些包裝標記,然后再發出去;internal會對傳入的內部消息監聽器進行代理,過濾掉非NODE_CLUSTER類別的消息,如果消息攜帶的message對象沒有ack屬性則最終會執行綁定監聽時傳入的回調函數,否則會從緩存中找出之前暫存的回調函數來執行。
發個消息為什么要搞這么復雜呢?這個ack屬性又是哪來的呢?其實這個utils模塊主要是在跨進程的雙向消息通訊時實現了方法復用,同一個message從工作進程發往主進程時和主進程發回給工作進程時是由同一個事件名internalMessage攜帶的,那如何來區分消息發送的方向呢,就是ack屬性,如果消息帶有ack屬性,就表示它是由主進程發給子進程的,那么就要調用子進程中的后續處理方法,這個方法其實就是子進程發送消息給主進程之前暫存在utils內部callbacks里的方法,也就是child.js中cluster._getServer()中調用send方法時傳入的回調方法,也就是net模塊中listenInCluster( )方法中的listenOnMasterHandle方法,這個方法漂洋過海透傳了N個函數,的確不容易看懂,“回調地獄”也的確不是鬧著玩的。再看看沒有ack屬性的情況,沒有這個屬性時消息是從子進程發給主進程的,自然要調用主進程的方法,從邏輯里不難看出,這種情況下方法引用的就是internal方法執行時傳入的第二個參數(master.js源碼213行執行的internal(worker, onmessage)的onmessage這個函數),源碼中就是利用高階函數這種分步執行的特點實現了引用。
五.act:queryServer消息
故事再回到第三節工作進程中發出act:'queryServer的消息后,來看主進程master.js中的代碼,主進程中在調用cluster.fork( )時就綁定了對worker線程internalMessage的監聽,對于act:queryServer類型的集群消息,主進程已經定義了queryServer這個方法來處理。這段源代碼的主要邏輯如下:
1.根據重要參數組拼接出一個唯一的key 2.1.根據key查詢是否有已經存在的調度句柄round-robin-handle,如果有則直接進行后續邏輯 2.2.如果沒有已經存在的調度句柄,則選擇調度策略,實例化一個調度句柄,并把它添加進記錄里 3.把消息數據message.data掛載在調度句柄的handle.data字段上 4.執行調度句柄的add方法,把子進程和一個回調方法傳進實例,回調方法被執行時會從調度句柄中取得數據,并組裝返回消息(帶有ack屬性和其他數據的消息)發給子進程,子進程收到這個消息后執行的方法,就是前文分析過的返回假句柄給net模塊中的`listenInCluster()`邏輯。從開篇的多進程代碼可以看到,每個子進程中執行的listen方法監聽的端口號都是一樣的,所以每個子進程發送queryServer消息給主進程并執行這段邏輯時,其實對應的key都是一樣的,所以調度對象RoundRobinHandle只會實例化一次,在之后的過程中,每一個子進程會根據key獲取到同一個調度實例,并調用add方法將worker對象和一個回調函數添加進調度實例,可以看到回調函數執行時,就會將原message中的seq屬性的值添加給ack屬性再掛載上處理后的數據并發送給子進程。那么剩下的事情,就剩下調度對象RoundRobinHandle的源碼了。
我們不妨來推測一下,它的主要邏輯就是在主進程中建立真正監聽目標端口的服務器,并添加當客戶端請求到達時對于工作進程的調度代碼,下一節我們就一起來驗證一下。
六.輪詢調度Round-Robin-Handle
調度方法的源碼是internal/cluster/round_robin_handle.js,另一種shared_handle.js是windows下使用的調度策略,先不做分析(主要是沒研究過,不敢瞎說)。先從構造函數開始:
16行,bingo,終于看到主進程啟動服務器了。接著就是根據參數而分流的監聽方法,集群代碼中對應的是20行的帶有有效port參數的情況,所以服務器就在主進程啟動了,最后來看看server開始觸發listening事件時執行的邏輯(此處調用的是once方法,所以只會執行一次):
1.將主進程server的內部_handle句柄,掛載給round-robin-handle實例 2.當這個句柄被連接時(也就是客戶端socket執行connect方法連接后),會觸發它的`connection`事件,回調函數會調用`distribute`方法來分發這個客戶端socket句柄,注意32行后面半句的箭頭函數方法,這里的handle就是指客戶端`socket`實例。 3.將server._handle指向null 4.將server屬性指向null如果你還記得net模塊中listen方法的邏輯的話可能會有印象,_handle的connection事件回調其實原本已經被復寫過一次了,也就是說單進程運行的程序在建立服務器時,server._handle的connection事件會觸發server實例的connection事件,而在集群模式下,主進程中調度實例中服務器句柄server._handle的connection再次被復寫,將邏輯改變為分發socket,而子進程中的server._handle還是保持原來的邏輯。
最后一步指向null的邏輯還涉及到add方法,繼續看主進程中調用的add方法:
這個send形參實際上就是主進程中傳入的最終向子進程發送返回消息的那個回調函數,它被封裝進了done函數,這里需要著重看一下55行的邏輯,this.server === null這個條件實際上對應的就是構造函數中服務器開始監聽的事件,所以55-59行的代碼以及構造函數中添加的listening事件的回調函數需要聯合在一起來理解,也就是每個子進程的send方法都被包裹在一個獨立的done函數中,這個函數會在主進程的server處于listening狀態后觸發執行,并且只觸發一次。當它觸發時,會從實例的handle屬性(也就是server的_handle句柄)上取得socket名稱然后調用send方法,這個特殊socket的名稱在回調函數中對應reply形參,最終掛載在message中發回了子進程。
至此其實主進程和子進程建立服務器的消息已經完成了閉環。最后再看一下RoundRobinHandle中最后兩個方法:
當客戶端socket執行connect方法連接到主進程server的句柄后,主進程會調用round-robin-handle實例的distribute方法,這個方法的邏輯比較簡單,把這個客戶端句柄加入到待處理隊列,然后從空閑進程隊列頭部取出一個worker進程,把它作為參數傳給handoff方法。
handoff方法中,從客戶端請求句柄隊列的頭部取出下一個待處理的socket,如果已經沒有要處理的請求,就把傳進來的worker放回空閑子進程隊列free中。在add方法內部封裝的done方法中也執行了這個handoff方法,現在再回過頭來看這個add方法的作用,就是當主進程處于監聽狀態后,將每一個子進程對象worker依次添加到空閑進程隊列free中。最后夠早了一個新的act:newconn消息,并通過調度選出的worker.process對象實現跨進程通訊來將待處理句柄和【新連接】消息發送給子進程。
七. 圖解集群建立過程的邏輯跳轉
集群建立過程的邏輯大致的跳轉路徑如下,細節部分直接參考前文的講解即可。
轉載于:https://www.cnblogs.com/dashnowords/p/11019089.html
總結
以上是生活随笔為你收集整理的【nodejs原理源码赏析(6)】深度剖析cluster模块源码与node.js多进程(下)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 349元小米中枢网关可以省了,自己做个小
- 下一篇: jquery attr()和prop()