javascript
js 获取father_(原创)Node.JS实战26:强大的工作池。收藏吧!你一定会用的到。...
在實際項目中,如果遇到需要大計算量的操作,按需fork(分叉)其實不是一個好的選擇。
因為fork的子進(jìn)程也是V8(NodeJS的核心引擎)的新實例,每創(chuàng)建一個新實例,需要約30毫秒啟動時間,和至少10MB的初始內(nèi)存。
也就是說,創(chuàng)建進(jìn)程是有代價的,你不能創(chuàng)建太多,也不能頻繁創(chuàng)建。那樣,達(dá)不到提高進(jìn)程效率的目的。
那么,該如何高效優(yōu)雅的使用子進(jìn)程呢?工作池!
工作池!
合理的辦法是創(chuàng)建一個可用的工作池,在池中存放足夠多的進(jìn)程,并可以隨時分配使用。
我們對上一節(jié)講的內(nèi)容進(jìn)行升級:當(dāng)父進(jìn)程發(fā)送一個任務(wù)給子進(jìn)程時,子進(jìn)程執(zhí)行任務(wù)。并將結(jié)果向主進(jìn)程反饋。
在父進(jìn)程中,需要的代碼會是這樣的:
function doWork(job,cb){var child = cp.fork("./worker");//發(fā)送工作給子進(jìn)程child.send(job);//希望子進(jìn)程返回一個確切的消息child.once("message",function(result){cb(null,result);}) }嗯...這樣講有些凌亂,這一章比較復(fù)雜,最好的辦法,還是寫一個完整的代碼,做為例子:
1、father.js,主進(jìn)程
var http = require("http"); var makePool = require("./pooler"); var runJob = makePool("./worker");http.createServer(function(req,res){runJob("some dummy job",function(er,data){console.log("father callback get:",data);if(er){return res.end("get an error:"+er.message)}res.end("work pool");})}).listen(8000)當(dāng)有客端訪問時,觸發(fā)runjob,開始啟行工作。
2、worker.js
process.on("message",function(job){console.log("worker get msg:",job);for(var i=0;i<10;i++){console.log("worker send:",job,i);process.send("finish job:"+job+i);}})收到father主進(jìn)程發(fā)來的消息時,使用process.send()方法調(diào)用子進(jìn)程,向工作池發(fā)出工作任務(wù)。
3、pool.js(工作池)
接收worker消息,用工作池完成操作,并反饋給主程序。
代碼中做了詳細(xì)的注釋 ,就不單獨對代碼做解析了:
var cp = require("child_process"); //獲取CPU數(shù)量,有幾個CPU就創(chuàng)建幾個子進(jìn)程,這樣就可以最大化的利用機(jī)器性能 var cpus = require("os").cpus().length;//模塊導(dǎo)出函數(shù) module.exports = function(workModule){//等待任務(wù)隊列,當(dāng)工作任務(wù)被下發(fā),但沒有閑工作進(jìn)程時,放到此隊列var awaiting = [];//存放準(zhǔn)備就緒的工作進(jìn)程var readyPool = [];//當(dāng)前的工作子進(jìn)程數(shù)量(工作池的大小)var poolSize = 0;return function doWork(job,cb){//如果工作池數(shù)量已經(jīng)最大,并且沒有準(zhǔn)備就緒的工作子進(jìn)程,也就是所有工作子進(jìn)程都在工作中,那么:排隊等待if(!readyPool.length && poolSize >cpus){//壓入到等待隊列,等待后續(xù)處理return awaiting.push([dowork,job.cb]);}//取得一個可用的工作子進(jìn)程,或fork(分叉)一個新的子進(jìn)程(增加工作池的大小)var child = readyPool.length ? readyPool.shift() : (poolSize++, cp.fork(workModule));{//子進(jìn)程是否完成回調(diào)的標(biāo)記var cbTriggered = false;//初始階段,移除子進(jìn)程上的監(jiān)聽,確保每個子進(jìn)程只擁有一次監(jiān)聽child.removeAllListeners();//錯誤child.once("error",function(err){//未回調(diào)if(!cbTriggered){//回調(diào)返回為錯誤cb(err);//回調(diào)標(biāo)識改為true:已回調(diào)cbTriggered = true;} //結(jié)束子進(jìn)程child.kill();//這里不用操作工作池poolSize--,因為kill會觸發(fā)exit事件,在exit事件中操作工作池});//子進(jìn)程退出了(不明原因的意外退出、被kill()等都觸發(fā))child.once("exit",function(code,signal){//未回調(diào)if(!cbTriggered){//回調(diào),返回信息cb(new Error("Child exited with code:"+code))}//工作池(正在工作的子進(jìn)程數(shù))大小減一poolSize --;//退出的子進(jìn)程,是否在準(zhǔn)備好的子進(jìn)程數(shù)組中var childIdx = readyPool.indexOf(child);if(childIdx > -1){//從準(zhǔn)備好的子進(jìn)程數(shù)組中移除readyPool.splice(childIdx,1);}})//獲取父進(jìn)程發(fā)來的消息child.on("message",function(msg){console.log("pool get msg:",msg);cb(null,msg);cbTriggered = true;readyPool.push(child);//如何等待區(qū)有內(nèi)容,處理之if(awaiting.length){setImmediate.apply(null,awaiting.shift());}//向父進(jìn)程發(fā)送消息}).send(job);}//child區(qū)域結(jié)束} }執(zhí)行效果
圖中展示的是工作流程,可見此種方法可以達(dá)到我們的預(yù)期,工作池很OK。
對于實際編程中遇到的消耗比較大的情況,使用此種方法可以極大的提高效率,且本文已經(jīng)將工作池寫成了模塊(pooler.js)
建議收藏,nodejs開發(fā),在某個時候一定會遇到適合的場景的。
總結(jié)
以上是生活随笔為你收集整理的js 获取father_(原创)Node.JS实战26:强大的工作池。收藏吧!你一定会用的到。...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 渭南治疗子宫发育不良最好的医院推荐
- 下一篇: springboot2 使用hikari