node中的Stream-Readable和Writeable解读
在node中,只要涉及到文件IO的場景一般都會涉及到一個類-Stream。Stream是對IO設(shè)備的抽象表示,其在JAVA中也有涉及,主要體現(xiàn)在四個類-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream類針對字節(jié)數(shù)據(jù)進行讀寫;Reader和Writer針對字符數(shù)據(jù)讀寫。同時Java中有多種針對這四種類型的擴展類,如節(jié)點流、緩沖流和轉(zhuǎn)換流等。比較而言,node中Stream類型也和Java中的類似,同樣提供了支持字節(jié)和字符讀寫的Readable和Writeable類,也存在轉(zhuǎn)換流Transform類,本文主要分析node中Readable和Writeable的實現(xiàn)機制,從底層的角度更好的理解Readable和Writeable實現(xiàn)機制,解讀在讀寫過程中發(fā)生的一些重要事件。
Readable類
Readable對應(yīng)于Java中的InputStream和Reader兩個類,針對Readable設(shè)置encode編碼可完成內(nèi)部數(shù)據(jù)由Buffer到字符的轉(zhuǎn)換。Readable Stream有兩種模式,即flowing和paused模式。這兩種模式對于用戶而言區(qū)別在于是否需要手動調(diào)用Readable.prototype.read(n),讀取緩沖區(qū)的數(shù)據(jù)。查詢node API文檔可知觸發(fā)flowing模式有三種方式:
- 偵聽data事件
- readable.resume()
- readable.pipe()
而觸發(fā)paused模式同樣有幾種方式: - 移除data事件
- readable.pause()
- readable.unpipe()
可能這樣講解大家仍不明白Readable Stream這兩種模式的區(qū)別,那么下文從更深層次分析兩種模式的機制。
深入Readable的實現(xiàn)
Readable繼承EventEmitter,大家也都知道。但是相信大家應(yīng)該不怎么熟悉Readable的實例屬性**_readableState**。該屬性是一個ReadableState類型的對象,保存了Readable實例的重要信息,如讀取模式(是否為對象模式)、highWaterMark(緩沖區(qū)存放的最大字節(jié)數(shù))、緩沖區(qū)、flowing模式等。在Readable的實現(xiàn)中,處處使用ReadableState對象記錄當前讀取狀態(tài),并設(shè)置緩沖區(qū)保證讀操作的順利進行。
首先需要針對Readable.prototype.read方法進行特別解讀:
if (n === 0 &&state.needReadable &&(state.length >= state.highWaterMark || state.ended)) {debug('read: emitReadable', state.length, state.ended);if (state.length === 0 && state.ended)endReadable(this);elseemitReadable(this);return null;}當讀入的數(shù)據(jù)為0時,執(zhí)行emitReadable操作。這意味著,針對Readable Stream執(zhí)行read(0)方法會觸發(fā)readable事件,但是不會讀當前緩沖區(qū)。因此使用read(0)可以完成一些比較巧妙的事情,如在readable處理函數(shù)中可以使用read(0)觸發(fā)下一次readable事件,可選的操作讀緩沖區(qū)。
繼續(xù)分析代碼,如果讀入的數(shù)據(jù)并不是0,則計算讀取緩沖區(qū)的具體字節(jié)數(shù),
n = howMuchToRead(n, state);function howMuchToRead(n, state) {if (state.length === 0 && state.ended)return 0;if (state.objectMode)return n === 0 ? 0 : 1;if (n === null || isNaN(n)) {// only flow one buffer at a timeif (state.flowing && state.buffer.length)return state.buffer[0].length;// 若是paused狀態(tài),則讀全部的緩沖區(qū)elsereturn state.length;}if (n <= 0)return 0;if (n > state.highWaterMark)state.highWaterMark = computeNewHighWaterMark(n);// don't have that much. return null, unless we've ended.if (n > state.length) {if (!state.ended) {state.needReadable = true;return 0;} else {return state.length;}}return n; }針對對象模式的讀取,每次只讀一個;對于處在flowing模式下的讀取,每次只讀緩沖區(qū)中第一個buffer的長度;在paused模式下則讀取全部緩沖區(qū)的長度;若讀取的字節(jié)數(shù)大于設(shè)置的緩沖區(qū)最大值,則適當擴大緩沖區(qū)的大小(默認為16k,最大為8m);若讀取的長度大于當前緩沖區(qū)的大小,設(shè)置needReadable屬性并準備數(shù)據(jù)等待下一次讀取。
接下來,判斷是否需要準備數(shù)據(jù)。在這里,依賴于needReadable的值,
var doRead = state.needReadable;debug('need readable', doRead);if (state.length === 0 || state.length - n < state.highWaterMark) {doRead = true;debug('length less than watermark', doRead);}// reading, then it's unnecessary.if (state.ended || state.reading) {doRead = false;debug('reading or ended', doRead);}如果當前緩沖區(qū)為空,或者緩沖區(qū)并未超出我們設(shè)定的最大值,那么就可以繼續(xù)準備數(shù)據(jù);如果此時正在準備數(shù)據(jù)或者已經(jīng)結(jié)束讀取,那么就放棄準備數(shù)據(jù)。一旦doRead為true,那么進入準備數(shù)據(jù)階段,
if (doRead) {debug('do read');state.reading = true;state.sync = true;// if the length is currently zero, then we *need* a readable event.if (state.length === 0)state.needReadable = true;// call internal read method// 默認Readable未實現(xiàn)_read,拋出Error// 針對自定義的Readable子類,_read可修改state.buffer的數(shù)量,進行預(yù)處理,// 然后由下面的fromList讀出去緩存中的相關(guān)數(shù)據(jù)this._read(state.highWaterMark);state.sync = false;}接下來設(shè)置相關(guān)的標志位,進行_read處理。針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現(xiàn)類需要實現(xiàn)這個方法,在該方法中手動添加數(shù)據(jù)到Readable對象的讀緩沖區(qū),然后進行Readable的讀取??梢岳斫鉃開read函數(shù)為讀取數(shù)據(jù)前的準備工作(準備數(shù)據(jù)),針對的是流的實現(xiàn)者而言。
if (doRead && !state.reading)n = howMuchToRead(nOrig, state);var ret;if (n > 0)ret = fromList(n, state);elseret = null;if (ret === null) {state.needReadable = true;n = 0;}state.length -= n;if (state.length === 0 && !state.ended)state.needReadable = true;if (nOrig !== n && state.ended && state.length === 0)endReadable(this);// flowing模式下的數(shù)據(jù)讀取依賴于 read函數(shù)// data事件觸發(fā)的次數(shù),依賴于howMuchToRead計算的次數(shù)if (ret !== null)this.emit('data', ret);一旦在_read中更新了緩沖區(qū),那么我們需要重新計算(消費者,即可寫流)讀取的字節(jié)數(shù)。fromList方法完成了讀緩沖區(qū)的slice,如果是objectMode下的讀,則只讀緩沖區(qū)的第一個對象;針對未傳參數(shù)的read方法而言,默認讀取全部緩沖區(qū)等等。從讀緩沖區(qū)讀取完數(shù)據(jù)之后設(shè)置相關(guān)flag,如needReadable,最終,觸發(fā)data事件,結(jié)束!
上節(jié)提到,設(shè)置data事件的執(zhí)行函數(shù)會進入flowing模式的讀,而上文看到正是read方法觸發(fā)了data事件,而默認條件下Readable處于paused狀態(tài),因此在paused狀態(tài)讀取數(shù)據(jù)需要手動執(zhí)行read函數(shù),每次read讀取完畢觸發(fā)一次data事件。從這點看出,flowing和paused狀態(tài)區(qū)別在于是否需要手動執(zhí)行read()來獲取數(shù)據(jù)。flowing狀態(tài)下,我們無需執(zhí)行read,僅需要設(shè)置data事件處理函數(shù)或者設(shè)定導(dǎo)流目標pipe;而在paused狀態(tài)下,不僅僅是簡單的執(zhí)行read方法,因為讀緩沖區(qū)的內(nèi)容時刻在改變,一旦讀緩沖區(qū)又有新數(shù)據(jù),簡單執(zhí)行read()就沒法滿足需求(因為我們無法知道是否又有新數(shù)據(jù)到來),因此需要偵聽讀緩沖區(qū)的相關(guān)事件,即readable事件,在該事件處理函數(shù)中進行read相關(guān)數(shù)據(jù)。
那么,什么情況下會觸發(fā)readable事件呢?在實現(xiàn)_read私有方法中,我們使用stream.push(chunk)或stream.unshift(chunk)方法注入數(shù)據(jù)到讀緩沖區(qū),那么push和unshift方法都實現(xiàn)了下面的邏輯,
if (state.flowing && state.length === 0 && !state.sync) {stream.emit('data', chunk);stream.read(0); } else {// update the buffer info.state.length += state.objectMode ? 1 : chunk.length;if (addToFront)state.buffer.unshift(chunk);elsestate.buffer.push(chunk);if (state.needReadable)emitReadable(stream); }function emitReadable(stream) {var state = stream._readableState;state.needReadable = false;if (!state.emittedReadable) {debug('emitReadable', state.flowing);state.emittedReadable = true;if (state.sync)process.nextTick(emitReadable_, stream);elseemitReadable_(stream);} }function emitReadable_(stream) {debug('emit readable');stream.emit('readable');flow(stream); } // 在flowing狀態(tài)下,自動讀取流(替代paused狀態(tài)下手動read) function flow(stream) {var state = stream._readableState;debug('flow', state.flowing);if (state.flowing) {do {var chunk = stream.read();} while (null !== chunk && state.flowing);} }一旦處于flowing模式并且當前緩沖區(qū)沒有數(shù)據(jù),那么就立即將預(yù)處理的push(unshift)數(shù)據(jù)傳遞給data事件處理函數(shù),并執(zhí)行stream.read(0)。前文已經(jīng)交代過,read(0)僅僅用來觸發(fā)readable事件,并不讀取緩沖區(qū),這就是觸發(fā)readable的第一種情況。
第二種則是第一種情況之外的所有情景,即根據(jù)操作(push、unshift)的不同將數(shù)據(jù)插入讀緩沖區(qū)的不同位置。最后執(zhí)行emitReadable函數(shù),觸發(fā)readable事件。針對emitReadable函數(shù),它的作用就是異步觸發(fā)readable事件,并執(zhí)行flow函數(shù)。flow函數(shù)則針對flowing狀態(tài)的Readable做自適應(yīng)讀取,免去了手動執(zhí)行read函數(shù)和何時執(zhí)行read函數(shù)的苦惱。
這樣,對于Readable的實現(xiàn)者,一旦在_read函數(shù)插入有效數(shù)據(jù)到讀緩沖區(qū),都會觸發(fā)readable事件,在paused狀態(tài)下,設(shè)置readable事件處理函數(shù)并手動執(zhí)行read函數(shù),便可完成數(shù)據(jù)的讀取;而在flowing狀態(tài)下,通過設(shè)置data事件處理函數(shù)或者定義pipe目標流同樣可以實現(xiàn)讀取。
既然pipe同樣可以觸發(fā)Readable進入flowing狀態(tài),那么pipe方法具體做了什么呢?其實pipe針對Readable和Writeable做了限流,首先針對Readable的data事件進行偵聽,并執(zhí)行Writeable的write函數(shù),當Writeable的寫緩沖區(qū)大于一個臨界值(highWaterMark),導(dǎo)致write函數(shù)返回false(此時意味著Writeable無法匹配Readable的速度,Writeable的寫緩沖區(qū)已經(jīng)滿了),此時,pipe修改了Readable模式,執(zhí)行pause方法,進入paused模式,停止讀取讀緩沖區(qū)。而同時Writeable開始刷新寫緩沖區(qū),刷新完畢后異步觸發(fā)drain事件,在該事件處理函數(shù)中,設(shè)置Readable為flowing狀態(tài),并繼續(xù)執(zhí)行flow函數(shù)不停的刷新讀緩沖區(qū),這樣就完成了pipe限流。需要注意的是,Readable和Writeable各自維護了一個緩沖區(qū),在實現(xiàn)的上有區(qū)別:Readable的緩沖區(qū)是一個數(shù)組,存放Buffer、String和Object類型;而Writeable則是一個有向鏈表,依次存放需要寫入的數(shù)據(jù)。
Writeable解讀
Writeable對應(yīng)Java的OutputStream和Writer類,實現(xiàn)字節(jié)和字符數(shù)據(jù)的寫。與Readable類似,Writeable的實例對象同樣維護了一個狀態(tài)對象-WriteableState,記錄了當前輸出流的狀態(tài)信息,如寫緩沖區(qū)的最大值(hightWaterMark)、緩沖區(qū)(有向鏈表)和緩沖區(qū)長度等信息。在本節(jié)中,主要分析輸出流的關(guān)鍵方法write和事件drain,并解析輸出流的實現(xiàn)者需要實現(xiàn)的方法**_write和write**的關(guān)系。
function write ---------------------------- if (state.ended)writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {state.pendingcb++;ret = writeOrBuffer(this, state, chunk, encoding, cb);}return ret;在write方法中,判斷寫入數(shù)據(jù)的格式并執(zhí)行writeOrBuffer函數(shù),并返回執(zhí)行結(jié)果,該返回值標示當前寫緩沖區(qū)是否已滿。真正執(zhí)行寫入邏輯的是writeOrBuffer函數(shù),該函數(shù)的作用在于刷新或者更新寫緩沖區(qū),下面看看主要做了什么,
function writeOrBuffer(stream, state, chunk, encoding, cb) {chunk = decodeChunk(state, chunk, encoding);if (chunk instanceof Buffer)encoding = 'buffer';var len = state.objectMode ? 1 : chunk.length;state.length += len;// 如果緩存的長度大于highWaterMark,需要刷新緩沖,所以設(shè)置needDrain標志var ret = state.length < state.highWaterMark;// we must ensure that previous needDrain will not be reset to false.if (!ret)state.needDrain = true;// 緩存未處理的寫請求,在clearBuffer中執(zhí)行緩存// 由此看出,Readable和Writeable都有緩存,Readable 中緩存的方式是數(shù)組(項為Buffer,字符串或?qū)ο?#xff09;,Writeable的// 緩存則是對象鏈表if (state.writing || state.corked) {var last = state.lastBufferedRequest;state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);if (last) {last.next = state.lastBufferedRequest;} else {state.bufferedRequest = state.lastBufferedRequest;}state.bufferedRequestCount += 1;} else {doWrite(stream, state, false, len, chunk, encoding, cb);}return ret; }writeOrBuffer首先針對數(shù)據(jù)進行編碼,字符串轉(zhuǎn)換成Buffer類型,如果設(shè)置了Writeable的ObjectMode模式則仍為Object類型;接下來更新寫緩沖區(qū)的長度,并判斷寫緩沖區(qū)長度是否超過設(shè)定的Writeable的最大值(默認16k),如果超過超過則ret=false并更新WriteableState的屬性needDrain=true。ret的結(jié)果其實就是write方法返回值,因此一旦write返回值為false,意味著當前寫緩沖區(qū)已滿,需要停止繼續(xù)寫入數(shù)據(jù)。
在Readable的pipe方法中,涉及到了Writeable的drain事件。該事件的觸發(fā)意味著寫緩沖區(qū)已可以繼續(xù)緩存數(shù)據(jù),可見drain事件與寫緩沖區(qū)嚴格相關(guān)。繼續(xù)分析writeOrBuffer函數(shù),若當前輸出流正在寫數(shù)據(jù),那么則當前數(shù)據(jù)緩存至寫緩沖區(qū)(創(chuàng)建WriteReq對象);否則執(zhí)行doWrite函數(shù),刷新緩沖區(qū)。
function doWrite(stream, state, writev, len, chunk, encoding, cb) {state.writelen = len;state.writecb = cb;state.writing = true;state.sync = true;if (writev)stream._writev(chunk, state.onwrite);elsestream._write(chunk, encoding, state.onwrite);state.sync = false; }doWrite函數(shù)設(shè)置了需要寫入數(shù)據(jù)的長度、寫入狀態(tài)等信息,并執(zhí)行輸出流實現(xiàn)者需要實現(xiàn)的_write函數(shù)。在_write函數(shù)中,針對數(shù)據(jù)流向做最后的處理,這里分析_write函數(shù)的具體實現(xiàn)。_write函數(shù)有三個參數(shù),分別為chunk,encoding和state.onwrite回調(diào)函數(shù),對該回調(diào)函數(shù)稍后分析,先著重講解_write函數(shù)的實現(xiàn)。在node的fs模塊中,可以通過fs.createWriteStream創(chuàng)建Writeable實例,通過執(zhí)行
var writeStream = fs.createWriteStream('./output',{decodeStrings: false}); console.log(writeStream._write.toString());-----------------輸出-----------------function (data, encoding, cb) {if (!(data instanceof Buffer))return this.emit('error', new Error('Invalid data'));if (typeof this.fd !== 'number')return this.once('open', function() {this._write(data, encoding, cb);});var self = this;fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {if (er) {self.destroy();return cb(er);}self.bytesWritten += bytes;cb();});if (this.pos !== undefined)this.pos += data.length; }看出,在_write實現(xiàn)中,只接受Buffer類型的數(shù)據(jù),接著執(zhí)行fs.write操作,寫入到對應(yīng)文件描述符fd對應(yīng)的文件中,寫入成功或失敗后執(zhí)行回調(diào)函數(shù),即state.onwrite函數(shù)。
function onwrite(stream, er) {var state = stream._writableState;var sync = state.sync;var cb = state.writecb;onwriteStateUpdate(state);// 默認未重寫_write方法,會收到er值if (er)onwriteError(stream, state, sync, er, cb);else {// Check if we're actually ready to finish, but don't emit yetvar finished = needFinish(state);// 寫緩存的數(shù)據(jù)if (!finished &&!state.corked &&!state.bufferProcessing &&state.bufferedRequest) {clearBuffer(stream, state);}// 異步觸發(fā)drain事件if (sync) {process.nextTick(afterWrite, stream, state, finished, cb);} else {afterWrite(stream, state, finished, cb);}} }在state.onwrite函數(shù)中主要工作有兩個:
- 寫緩沖區(qū)的數(shù)據(jù)
- 寫完緩沖區(qū)的數(shù)據(jù)后,異步觸發(fā)drain事件
第一步,在clearBuffer函數(shù)中,就是取出寫緩沖區(qū)(有向鏈表)的第一個WriteReq對象,執(zhí)行doWrite函數(shù),寫入緩沖區(qū)的第一個數(shù)據(jù);這樣循環(huán)往復(fù)最終清空寫緩沖區(qū),重置一些標志位。
第二步,異步執(zhí)行afterWrite函數(shù),觸發(fā)drain事件,并判斷是否寫操作完畢觸發(fā)“finish”事件。這里之所以強調(diào)異步觸發(fā)drain事件,是因為為了保證先獲得write()返回值為false,給用戶綁定drain處理函數(shù)的時隙,然后再觸發(fā)drain事件。
至此,Writeable的重要流程已全部走通??梢钥闯鰜?#xff0c;在核心的write()中,判斷寫緩沖區(qū)是否已滿并返回該值,在適當條件下緩存數(shù)據(jù)或調(diào)用_write()寫數(shù)據(jù),在Writeable實現(xiàn)者需要實現(xiàn)的** _write() 中,主要任務(wù)是數(shù)據(jù)寫入方向控制,完成最基本的任務(wù)**。
總結(jié)
對比Readable的read()和_read(),我總結(jié)了下這四個函數(shù)在“讀寫過程”中的執(zhí)行順序與關(guān)系,如下圖所示:
轉(zhuǎn)載于:https://www.cnblogs.com/accordion/p/5560531.html
總結(jié)
以上是生活随笔為你收集整理的node中的Stream-Readable和Writeable解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 进程间的通讯(IPC)方式
- 下一篇: 软件工程博客---团队项目---个人设计