日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

nodejs中使用worker_threads来创建新的线程

發(fā)布時(shí)間:2024/2/28 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 nodejs中使用worker_threads来创建新的线程 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 簡(jiǎn)介
  • worker_threads
    • isMainThread
    • MessageChannel
    • parentPort和MessagePort
    • markAsUntransferable
    • SHARE_ENV
    • workerData
    • worker類
    • receiveMessageOnPort
    • moveMessagePortToContext
  • worker_threads的線程池

簡(jiǎn)介

之前的文章中提到了,nodejs中有兩種線程,一種是event loop用來(lái)相應(yīng)用戶的請(qǐng)求和處理各種callback。另一種就是worker pool用來(lái)處理各種耗時(shí)操作。

nodejs的官網(wǎng)提到了一個(gè)能夠使用nodejs本地woker pool的lib叫做webworker-threads。

可惜的是webworker-threads的最后一次更新還是在2年前,而在最新的nodejs 12中,根本無(wú)法使用。

而webworker-threads的作者則推薦了一個(gè)新的lib叫做web-worker。

web-worker是構(gòu)建于nodejs的worker_threads之上的,本文將會(huì)詳細(xì)講解worker_threads和web-worker的使用。

worker_threads

worker_threads模塊的源代碼源自lib/worker_threads.js,它指的是工作線程,可以開(kāi)啟一個(gè)新的線程來(lái)并行執(zhí)行javascript程序。

worker_threads主要用來(lái)處理CPU密集型操作,而不是IO操作,因?yàn)閚odejs本身的異步IO已經(jīng)非常強(qiáng)大了。

worker_threads中主要有5個(gè)屬性,3個(gè)class和3個(gè)主要的方法。接下來(lái)我們將會(huì)一一講解。

isMainThread

isMainThread用來(lái)判斷代碼是否在主線程中運(yùn)行,我們看一個(gè)使用的例子:

const { Worker, isMainThread } = require('worker_threads');if (isMainThread) {console.log('在主線程中');new Worker(__filename); } else {console.log('在工作線程中');console.log(isMainThread); // 打印 'false'。 }

上面的例子中,我們從worker_threads模塊中引入了Worker和isMainThread,Worker就是工作線程的主類,我們將會(huì)在后面詳細(xì)講解,這里我們使用Worker創(chuàng)建了一個(gè)工作線程。

MessageChannel

MessageChannel代表的是一個(gè)異步雙向通信channel。MessageChannel中沒(méi)有方法,主要通過(guò)MessageChannel來(lái)連接兩端的MessagePort。

class MessageChannel {readonly port1: MessagePort;readonly port2: MessagePort;}

當(dāng)我們使用new MessageChannel()的時(shí)候,會(huì)自動(dòng)創(chuàng)建兩個(gè)MessagePort。

const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log('received', message)); port2.postMessage({ foo: 'bar' }); // Prints: received { foo: 'bar' } from the `port1.on('message')` listener

通過(guò)MessageChannel,我們可以進(jìn)行MessagePort間的通信。

parentPort和MessagePort

parentPort是一個(gè)MessagePort類型,parentPort主要用于worker線程和主線程進(jìn)行消息交互。

通過(guò)parentPort.postMessage()發(fā)送的消息在主線程中將可以通過(guò)worker.on(‘message’)接收。

主線程中通過(guò)worker.postMessage()發(fā)送的消息將可以在工作線程中通過(guò)parentPort.on(‘message’)接收。

我們看一下MessagePort的定義:

class MessagePort extends EventEmitter {close(): void;postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;ref(): void;unref(): void;start(): void;addListener(event: "close", listener: () => void): this;addListener(event: "message", listener: (value: any) => void): this;addListener(event: string | symbol, listener: (...args: any[]) => void): this;emit(event: "close"): boolean;emit(event: "message", value: any): boolean;emit(event: string | symbol, ...args: any[]): boolean;on(event: "close", listener: () => void): this;on(event: "message", listener: (value: any) => void): this;on(event: string | symbol, listener: (...args: any[]) => void): this;once(event: "close", listener: () => void): this;once(event: "message", listener: (value: any) => void): this;once(event: string | symbol, listener: (...args: any[]) => void): this;prependListener(event: "close", listener: () => void): this;prependListener(event: "message", listener: (value: any) => void): this;prependListener(event: string | symbol, listener: (...args: any[]) => void): this;prependOnceListener(event: "close", listener: () => void): this;prependOnceListener(event: "message", listener: (value: any) => void): this;prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;removeListener(event: "close", listener: () => void): this;removeListener(event: "message", listener: (value: any) => void): this;removeListener(event: string | symbol, listener: (...args: any[]) => void): this;off(event: "close", listener: () => void): this;off(event: "message", listener: (value: any) => void): this;off(event: string | symbol, listener: (...args: any[]) => void): this;}

MessagePort繼承自EventEmitter,它表示的是異步雙向通信channel的一端。這個(gè)channel就叫做MessageChannel,MessagePort通過(guò)MessageChannel來(lái)進(jìn)行通信。

我們可以通過(guò)MessagePort來(lái)傳輸結(jié)構(gòu)體數(shù)據(jù),內(nèi)存區(qū)域或者其他的MessagePorts。

從源代碼中,我們可以看到MessagePort中有兩個(gè)事件,close和message。

close事件將會(huì)在channel的中任何一端斷開(kāi)連接的時(shí)候觸發(fā),而message事件將會(huì)在port.postMessage時(shí)候觸發(fā),下面我們看一個(gè)例子:

const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel();// Prints: // foobar // closed! port2.on('message', (message) => console.log(message)); port2.on('close', () => console.log('closed!'));port1.postMessage('foobar'); port1.close();

port.on(‘message’)實(shí)際上為message事件添加了一個(gè)listener,port還提供了addListener方法來(lái)手動(dòng)添加listener。

port.on(‘message’)會(huì)自動(dòng)觸發(fā)port.start()方法,表示啟動(dòng)一個(gè)port。

當(dāng)port有l(wèi)istener存在的時(shí)候,這表示port存在一個(gè)ref,當(dāng)存在ref的時(shí)候,程序是不會(huì)結(jié)束的。我們可以通過(guò)調(diào)用port.unref方法來(lái)取消這個(gè)ref。

接下來(lái)我們看一下怎么通過(guò)port來(lái)傳輸消息:

port.postMessage(value[, transferList])

postMessage可以接受兩個(gè)參數(shù),第一個(gè)參數(shù)是value,這是一個(gè)JavaScript對(duì)象。第二個(gè)參數(shù)是transferList。

先看一個(gè)傳遞一個(gè)參數(shù)的情況:

const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel();port1.on('message', (message) => console.log(message));const circularData = {}; circularData.foo = circularData; // Prints: { foo: [Circular] } port2.postMessage(circularData);

通常來(lái)說(shuō)postMessage發(fā)送的對(duì)象都是value的拷貝,但是如果你指定了transferList,那么在transferList中的對(duì)象將會(huì)被transfer到channel的接受端,并且不再存在于發(fā)送端,就好像把對(duì)象傳送出去一樣。

transferList是一個(gè)list,list中的對(duì)象可以是ArrayBuffer, MessagePort 和 FileHandle。

如果value中包含SharedArrayBuffer對(duì)象,那么該對(duì)象不能被包含在transferList中。

看一個(gè)包含兩個(gè)參數(shù)的例子:

const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel();port1.on('message', (message) => console.log(message));const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]); // post uint8Array的拷貝: port2.postMessage(uint8Array);port2.postMessage(uint8Array, [ uint8Array.buffer ]);//port2.postMessage(uint8Array);

上面的例子將輸出:

Uint8Array(4) [ 1, 2, 3, 4 ] Uint8Array(4) [ 1, 2, 3, 4 ]

第一個(gè)postMessage是拷貝,第二個(gè)postMessage是transfer Uint8Array底層的buffer。

如果我們?cè)俅握{(diào)用port2.postMessage(uint8Array),我們會(huì)得到下面的錯(cuò)誤:

DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.

buffer是TypedArray的底層存儲(chǔ)結(jié)構(gòu),如果buffer被transfer,那么之前的TypedArray將會(huì)變得不可用。

markAsUntransferable

要想避免這個(gè)問(wèn)題,我們可以調(diào)用markAsUntransferable將buffer標(biāo)記為不可transferable. 我們看一個(gè)markAsUntransferable的例子:

const { MessageChannel, markAsUntransferable } = require('worker_threads');const pooledBuffer = new ArrayBuffer(8); const typedArray1 = new Uint8Array(pooledBuffer); const typedArray2 = new Float64Array(pooledBuffer);markAsUntransferable(pooledBuffer);const { port1 } = new MessageChannel(); port1.postMessage(typedArray1, [ typedArray1.buffer ]);console.log(typedArray1); console.log(typedArray2);

SHARE_ENV

SHARE_ENV是傳遞給worker構(gòu)造函數(shù)的一個(gè)env變量,通過(guò)設(shè)置這個(gè)變量,我們可以在主線程與工作線程進(jìn)行共享環(huán)境變量的讀寫。

const { Worker, SHARE_ENV } = require('worker_threads'); new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV }).on('exit', () => {console.log(process.env.SET_IN_WORKER); // Prints 'foo'.});

workerData

除了postMessage(),還可以通過(guò)在主線程中傳遞workerData給worker的構(gòu)造函數(shù),從而將主線程中的數(shù)據(jù)傳遞給worker:

const { Worker, isMainThread, workerData } = require('worker_threads');if (isMainThread) {const worker = new Worker(__filename, { workerData: 'Hello, world!' }); } else {console.log(workerData); // Prints 'Hello, world!'. }

worker類

先看一下worker的定義:

class Worker extends EventEmitter {readonly stdin: Writable | null;readonly stdout: Readable;readonly stderr: Readable;readonly threadId: number;readonly resourceLimits?: ResourceLimits;constructor(filename: string | URL, options?: WorkerOptions);postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;ref(): void;unref(): void;terminate(): Promise<number>;getHeapSnapshot(): Promise<Readable>;addListener(event: "error", listener: (err: Error) => void): this;addListener(event: "exit", listener: (exitCode: number) => void): this;addListener(event: "message", listener: (value: any) => void): this;addListener(event: "online", listener: () => void): this;addListener(event: string | symbol, listener: (...args: any[]) => void): this;... }

worker繼承自EventEmitter,并且包含了4個(gè)重要的事件:error,exit,message和online。

worker表示的是一個(gè)獨(dú)立的 JavaScript 執(zhí)行線程,我們可以通過(guò)傳遞filename或者URL來(lái)構(gòu)造worker。

每一個(gè)worker都有一對(duì)內(nèi)置的MessagePort,在worker創(chuàng)建的時(shí)候就會(huì)相互關(guān)聯(lián)。worker使用這對(duì)內(nèi)置的MessagePort來(lái)和父線程進(jìn)行通信。

通過(guò)parentPort.postMessage()發(fā)送的消息在主線程中將可以通過(guò)worker.on(‘message’)接收。

主線程中通過(guò)worker.postMessage()發(fā)送的消息將可以在工作線程中通過(guò)parentPort.on(‘message’)接收。

當(dāng)然,你也可以顯式的創(chuàng)建MessageChannel 對(duì)象,然后將MessagePort作為消息傳遞給其他線程,我們看一個(gè)例子:

const assert = require('assert'); const {Worker, MessageChannel, MessagePort, isMainThread, parentPort } = require('worker_threads'); if (isMainThread) {const worker = new Worker(__filename);const subChannel = new MessageChannel();worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);subChannel.port2.on('message', (value) => {console.log('接收到:', value);}); } else {parentPort.once('message', (value) => {assert(value.hereIsYourPort instanceof MessagePort);value.hereIsYourPort.postMessage('工作線程正在發(fā)送此消息');value.hereIsYourPort.close();}); }

上面的例子中,我們借助了worker和parentPort本身的消息傳遞功能,傳遞了一個(gè)顯式的MessageChannel中的MessagePort。

然后又通過(guò)該MessagePort來(lái)進(jìn)行消息的分發(fā)。

receiveMessageOnPort

除了port的on(‘message’)方法之外,我們還可以使用receiveMessageOnPort來(lái)手動(dòng)接收消息:

const { MessageChannel, receiveMessageOnPort } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.postMessage({ hello: 'world' });console.log(receiveMessageOnPort(port2)); // Prints: { message: { hello: 'world' } } console.log(receiveMessageOnPort(port2)); // Prints: undefined

moveMessagePortToContext

先了解一下nodejs中的Context的概念,我們可以從vm中創(chuàng)建context,它是一個(gè)隔離的上下文環(huán)境,從而保證不同運(yùn)行環(huán)境的安全性,我們看一個(gè)context的例子:

const vm = require('vm');const x = 1;const context = { x: 2 }; vm.createContext(context); // 上下文隔離化對(duì)象。const code = 'x += 40; var y = 17;'; // `x` and `y` 是上下文中的全局變量。 // 最初,x 的值為 2,因?yàn)檫@是 context.x 的值。 vm.runInContext(code, context);console.log(context.x); // 42 console.log(context.y); // 17console.log(x); // 1; y 沒(méi)有定義。

在worker中,我們可以將一個(gè)MessagePort move到其他的context中。

worker.moveMessagePortToContext(port, contextifiedSandbox)

這個(gè)方法接收兩個(gè)參數(shù),第一個(gè)參數(shù)就是要move的MessagePort,第二個(gè)參數(shù)就是vm.createContext()創(chuàng)建的context對(duì)象。

worker_threads的線程池

上面我們提到了使用單個(gè)的worker thread,但是現(xiàn)在程序中一個(gè)線程往往是不夠的,我們需要?jiǎng)?chuàng)建一個(gè)線程池來(lái)維護(hù)worker thread對(duì)象。

nodejs提供了AsyncResource類,來(lái)作為對(duì)異步資源的擴(kuò)展。

AsyncResource類是async_hooks模塊中的。

下面我們看下怎么使用AsyncResource類來(lái)創(chuàng)建worker的線程池。

假設(shè)我們有一個(gè)task,使用來(lái)執(zhí)行兩個(gè)數(shù)相加,腳本名字叫做task_processor.js:

const { parentPort } = require('worker_threads'); parentPort.on('message', (task) => {parentPort.postMessage(task.a + task.b); });

下面是worker pool的實(shí)現(xiàn):

const { AsyncResource } = require('async_hooks'); const { EventEmitter } = require('events'); const path = require('path'); const { Worker } = require('worker_threads');const kTaskInfo = Symbol('kTaskInfo'); const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');class WorkerPoolTaskInfo extends AsyncResource {constructor(callback) {super('WorkerPoolTaskInfo');this.callback = callback;}done(err, result) {this.runInAsyncScope(this.callback, null, err, result);this.emitDestroy(); // `TaskInfo`s are used only once.} }class WorkerPool extends EventEmitter {constructor(numThreads) {super();this.numThreads = numThreads;this.workers = [];this.freeWorkers = [];for (let i = 0; i < numThreads; i++)this.addNewWorker();}addNewWorker() {const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));worker.on('message', (result) => {// In case of success: Call the callback that was passed to `runTask`,// remove the `TaskInfo` associated with the Worker, and mark it as free// again.worker[kTaskInfo].done(null, result);worker[kTaskInfo] = null;this.freeWorkers.push(worker);this.emit(kWorkerFreedEvent);});worker.on('error', (err) => {// In case of an uncaught exception: Call the callback that was passed to// `runTask` with the error.if (worker[kTaskInfo])worker[kTaskInfo].done(err, null);elsethis.emit('error', err);// Remove the worker from the list and start a new Worker to replace the// current one.this.workers.splice(this.workers.indexOf(worker), 1);this.addNewWorker();});this.workers.push(worker);this.freeWorkers.push(worker);this.emit(kWorkerFreedEvent);}runTask(task, callback) {if (this.freeWorkers.length === 0) {// No free threads, wait until a worker thread becomes free.this.once(kWorkerFreedEvent, () => this.runTask(task, callback));return;}const worker = this.freeWorkers.pop();worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);worker.postMessage(task);}close() {for (const worker of this.workers) worker.terminate();} }module.exports = WorkerPool;

我們給worker創(chuàng)建了一個(gè)新的kTaskInfo屬性,并且將異步的callback封裝到WorkerPoolTaskInfo中,賦值給worker.kTaskInfo.

接下來(lái)我們就可以使用workerPool了:

const WorkerPool = require('./worker_pool.js'); const os = require('os');const pool = new WorkerPool(os.cpus().length);let finished = 0; for (let i = 0; i < 10; i++) {pool.runTask({ a: 42, b: 100 }, (err, result) => {console.log(i, err, result);if (++finished === 10)pool.close();}); }

本文作者:flydean程序那些事

本文鏈接:http://www.flydean.com/nodejs-worker-thread/

本文來(lái)源:flydean的博客

歡迎關(guān)注我的公眾號(hào):「程序那些事」最通俗的解讀,最深刻的干貨,最簡(jiǎn)潔的教程,眾多你不知道的小技巧等你來(lái)發(fā)現(xiàn)!

總結(jié)

以上是生活随笔為你收集整理的nodejs中使用worker_threads来创建新的线程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。