nodejs-stream部分
參考:
https://blog.csdn.net/eeewwwddd/article/details/81042225
http://nodejs.cn/api/stream.html#stream_writable_write_chunk_encoding_callback
?
流(stream)是 Node.js 中處理流式數(shù)據(jù)的抽象接口。?stream?模塊提供了一些 API,用于構(gòu)建實現(xiàn)了流接口的對象。
Node.js 提供了多種流對象。 例如,HTTP 服務(wù)器的請求和?process.stdout?都是流的實例。
流可以是可讀的、可寫的、或者可讀可寫的。 所有的流都是?EventEmitter?的實例,即可以通過事件的監(jiān)聽得以觸發(fā)事件并執(zhí)行一定的操作,如:
stream?模塊可以通過以下方式使用:
const stream = require('stream');盡管理解流的工作方式很重要,但是?stream?模塊本身主要用于開發(fā)者創(chuàng)建新類型的流實例。
對于以消費(fèi)流對象為主的開發(fā)者,極少需要直接使用?stream?模塊。
?
Node.js 中有四種基本的流類型:
Writable - 可寫入數(shù)據(jù)的流(例如 fs.createWriteStream())。Readable - 可讀取數(shù)據(jù)的流(例如 fs.createReadStream())。Duplex - 可讀又可寫的流(例如 net.Socket)。Transform - 在讀寫過程中可以修改或轉(zhuǎn)換數(shù)據(jù)的 Duplex 流(例如 zlib.createDeflate())?
兩種模式
?二進(jìn)制模式
每個分塊都是buffer、string對象
對象模式
Node.js 創(chuàng)建的流都是運(yùn)作在字符串和?Buffer(或?Uint8Array)上。 當(dāng)然,流的實現(xiàn)也可以使用其它類型的 JavaScript 值(除了?null)。 這些流會以“對象模式”進(jìn)行操作。
當(dāng)創(chuàng)建流時,可以使用?objectMode?選項把流實例切換到對象模式。 將已存在的流切換到對象模式是不安全的。
?
?比如如果想創(chuàng)建一個的可以壓入任意形式數(shù)據(jù)的可讀流,只要在創(chuàng)建流的時候設(shè)置參數(shù)objectMode為true即可,例如:Readable({ objectMode: true })
如果readable stream寫入的是字符串,那么字符串會默認(rèn)轉(zhuǎn)換為Buffer,如果在創(chuàng)建流的時候設(shè)置Writable({ decodeStrings: false })參數(shù),那么不會做轉(zhuǎn)換。
如果readable stream寫入的數(shù)據(jù)是對象,那么需要這樣創(chuàng)建writable stream,Writable({ objectMode: true })
??就是如果輸入的數(shù)據(jù)并不是Buffer(或?Uint8Array)格式的時候,那么在創(chuàng)建這個流的時候就要將其設(shè)置為對象模式,即設(shè)置其的objectMode: true,舉例:
const DuplexStream = require('readable-stream').Duplex const inherits = require('util').inheritsmodule.exports = PostMessageStreaminherits(PostMessageStream, DuplexStream)function PostMessageStream (opts) {DuplexStream.call(this, { objectMode: true, }) ... }?
緩沖
可寫流和可讀流都會在內(nèi)部的緩沖器中存儲數(shù)據(jù),可以分別使用的?writable.writableBuffer?或?readable.readableBuffer?來獲取。
可緩沖的數(shù)據(jù)大小取決于傳入流構(gòu)造函數(shù)的?highWaterMark?選項。 對于普通的流,highWaterMark?指定了字節(jié)的總數(shù)。 對于對象模式的流,highWaterMark?指定了對象的總數(shù)。
當(dāng)調(diào)用?stream.push(chunk)?時,數(shù)據(jù)會被緩沖在可讀流中。 如果流的消費(fèi)者沒有調(diào)用?stream.read(),則數(shù)據(jù)會保留在內(nèi)部隊列中直到被消費(fèi)。
一旦內(nèi)部的可讀緩沖的總大小達(dá)到?highWaterMark?指定的閾值時,流會暫時停止從底層資源讀取數(shù)據(jù),直到當(dāng)前緩沖的數(shù)據(jù)被消費(fèi) (也就是說,流會停止調(diào)用內(nèi)部的用于填充可讀緩沖的?readable._read())。
當(dāng)調(diào)用?writable.write(chunk)?時,數(shù)據(jù)會被緩沖在可寫流中。 當(dāng)內(nèi)部的可寫緩沖的總大小小于?highWaterMark?設(shè)置的閾值時,調(diào)用?writable.write()?會返回?true。 一旦內(nèi)部緩沖的大小達(dá)到或超過?highWaterMark?時,則會返回?false。
stream?API 的主要目標(biāo),特別是?stream.pipe(),是為了限制數(shù)據(jù)的緩沖到可接受的程度,也就是讀寫速度不一致的源頭與目的地不會壓垮內(nèi)存。
因為?Duplex?和?Transform?都是可讀又可寫的,所以它們各自維護(hù)著兩個相互獨(dú)立的內(nèi)部緩沖器用于讀取和寫入, 這使得它們在維護(hù)數(shù)據(jù)流時,讀取和寫入兩邊可以各自獨(dú)立地運(yùn)作。 例如,net.Socket?實例是?Duplex?流,它的可讀端可以消費(fèi)從 socket 接收的數(shù)據(jù),而可寫端則可以將數(shù)據(jù)寫入到 socket。 因為數(shù)據(jù)寫入到 socket 的速度可能比接收數(shù)據(jù)的速度快或者慢,所以在讀寫兩端獨(dú)立地進(jìn)行操作(或緩沖)就顯得很重要了。
?
【1】用于消費(fèi)流的 API(即讀取流中數(shù)據(jù))
test.js
const http = require('http');const server = http.createServer((req, res) => {// req 是一個 http.IncomingMessage 實例,它是可讀流。// res 是一個 http.ServerResponse 實例,它是可寫流。 let body = '';// 接收數(shù)據(jù)為 utf8 字符串,// 如果沒有設(shè)置字符編碼,則會接收到 Buffer 對象。req.setEncoding('utf8');// 如果添加了監(jiān)聽器,則可讀流會觸發(fā) 'data' 事件。req.on('data', (chunk) => {body += chunk;});// 'end' 事件表明整個請求體已被接收。 req.on('end', () => {try {const data = JSON.parse(body);// 響應(yīng)信息給用戶。res.write(typeof data);res.end();//end()表示寫結(jié)束} catch (er) {// json 解析失敗。res.statusCode = 400;return res.end(`錯誤: ${er.message}`);}}); });server.listen(1337);然后在終端使用node test.js運(yùn)行該服務(wù)器
然后在另一個終端使用curl localhost:1337 -d "{}" 連接服務(wù)器localhost:1337 ,-d即post數(shù)據(jù)data為{} ,返回object
curl localhost:1337 -d "{}" 返回object curl localhost:1337 -d "\"foo\"" 返回string curl localhost:1337 -d "not json" 返回 錯誤: Unexpected token o in JSON at position 1?
可寫流(比如例子中的?res)會暴露了一些方法,比如?write()?和?end()?用于寫入數(shù)據(jù)到流。
當(dāng)數(shù)據(jù)可以從流讀取時,可讀流會使用?EventEmitter?API 來通知應(yīng)用程序。 從流讀取數(shù)據(jù)的方式有很多種。
可寫流和可讀流都通過多種方式使用?EventEmitter?API 來通訊流的當(dāng)前狀態(tài)。
Duplex?流和?Transform?流都是可寫又可讀的。
對于只需寫入數(shù)據(jù)到流或從流消費(fèi)數(shù)據(jù)的應(yīng)用程序,并不需要直接實現(xiàn)流的接口,通常也不需要調(diào)用?require('stream')
?
《1》可寫流
可寫流是對數(shù)據(jù)要被寫入的目的地的一種抽象。
可寫流的例子包括:
- 客戶端的 HTTP 請求
- 服務(wù)器的 HTTP 響應(yīng)
- fs 的寫入流
- zlib 流
- crypto 流
- TCP socket
- 子進(jìn)程 stdin
- process.stdout、process.stderr
上面的一些例子事實上是實現(xiàn)了可寫流接口的?Duplex?流。
所有可寫流都實現(xiàn)了?stream.Writable?類定義的接口。
盡管可寫流的具體實例可能略有差別,但所有的可寫流都遵循同一基本的使用模式,如以下例子所示:
const myStream = getWritableStreamSomehow(); myStream.write('一些數(shù)據(jù)'); myStream.write('更多數(shù)據(jù)'); myStream.end('完成寫入數(shù)據(jù)');//說明完成寫入?
stream.Writable 類
下面介紹幾類事件:
'close' 事件
當(dāng)流或其底層資源(比如文件描述符)被關(guān)閉時觸發(fā)。 表明不會再觸發(fā)其他事件,也不會再發(fā)生操作。
不是所有可寫流都會觸發(fā)?'close'?事件。
'drain' 事件
如果調(diào)用?stream.write(chunk)?返回?false,可能緩沖區(qū)已滿,需要等待,則當(dāng)有空間可以繼續(xù)寫入數(shù)據(jù)到流時會觸發(fā)?'drain'?事件。
// 向可寫流中寫入數(shù)據(jù)一百萬次。 // 留意背壓(back-pressure)。 function writeOneMillionTimes(writer, data, encoding, callback) {let i = 1000000;write();function write() {let ok = true;do {i--;if (i === 0) {// 最后一次寫入。 writer.write(data, encoding, callback);} else {// 檢查是否可以繼續(xù)寫入。 // 不要傳入回調(diào),因為寫入還沒有結(jié)束。ok = writer.write(data, encoding);}} while (i > 0 && ok);if (i > 0) {// 被提前中止。// 當(dāng)觸發(fā) 'drain' 事件時繼續(xù)寫入,繼續(xù)運(yùn)行write()函數(shù)。writer.once('drain', write);}} }'error' 事件
當(dāng)寫入數(shù)據(jù)發(fā)生錯誤時觸發(fā)。
當(dāng)觸發(fā)?'error'?事件時,流還未被關(guān)閉
'finish' 事件
調(diào)用?stream.end()?且緩沖數(shù)據(jù)都已傳給底層系統(tǒng)之后觸發(fā)。
const http = require('http');const server = http.createServer((req, res) => {// req 是一個 http.IncomingMessage 實例,它是可讀流。// res 是一個 http.ServerResponse 實例,它是可寫流。 let body = '';// 接收數(shù)據(jù)為 utf8 字符串,// 如果沒有設(shè)置字符編碼,則會接收到 Buffer 對象。req.setEncoding('utf8');// 如果添加了監(jiān)聽器,則可讀流會觸發(fā) 'data' 事件。req.on('data', (chunk) => {body += chunk;});// 'end' 事件表明整個請求體已被接收。 req.on('end', () => {try {const data = JSON.parse(body);// 響應(yīng)信息給用戶。res.write(typeof data);res.end();//會觸發(fā)finish事件 res.on('finish', () => {console.error('寫入已完成');});} catch (er) {// json 解析失敗。res.statusCode = 400;return res.end(`錯誤: ${er.message}`);}}); });server.listen(1337);運(yùn)行結(jié)果:
'pipe' 事件
- src?<stream.Readable>?通過管道流入到可寫流的來源流。
當(dāng)在可讀流上調(diào)用?stream.pipe()?時觸發(fā)。
var assert = require('assert'); const writer = process.stdout; const reader = process.stdin; writer.on('pipe', (src) => {console.error('有數(shù)據(jù)正通過管道流入寫入器');assert.equal(src,reader);//兩者相等console.log(src); }); reader.pipe(writer);返回:
有數(shù)據(jù)正通過管道流入寫入器 ReadStream {connecting: false,_hadError: false,_handle:TTY { owner: [Circular], onread: [Function: onread], reading: false },_parent: null,_host: null,_readableState:ReadableState {objectMode: false,//非對象模式highWaterMark: 0,buffer: BufferList { length: 0 },length: 0,pipes:WriteStream {connecting: false,_hadError: false,_handle: [TTY],_parent: null,_host: null,_readableState: [ReadableState],readable: false,_events: [Object],_eventsCount: 7,_maxListeners: undefined,_writableState: [WritableState],writable: true,allowHalfOpen: false,_sockname: null,_writev: null,_pendingData: null,_pendingEncoding: '',server: null,_server: null,columns: 80,rows: 24,_type: 'tty',fd: 1,_isStdio: true,destroySoon: [Function: destroy],_destroy: [Function],[Symbol(asyncId)]: 2,[Symbol(lastWriteQueueSize)]: 0,[Symbol(timeout)]: null,[Symbol(kBytesRead)]: 0,[Symbol(kBytesWritten)]: 0 },pipesCount: 1,flowing: true,ended: false,endEmitted: false,reading: false,sync: false,needReadable: true,emittedReadable: false,readableListening: false,resumeScheduled: true,emitClose: false,destroyed: false,defaultEncoding: 'utf8',awaitDrain: 0,readingMore: false,decoder: null,encoding: null },readable: true,_events:{ end: [ [Function: onReadableStreamEnd], [Function] ],pause: [Function],data: [Function: ondata] },_eventsCount: 3,_maxListeners: undefined,_writableState:WritableState {objectMode: false,highWaterMark: 0,finalCalled: false,needDrain: false,ending: false,ended: false,finished: false,destroyed: false,decodeStrings: false,defaultEncoding: 'utf8',length: 0,writing: false,corked: 0,sync: true,bufferProcessing: false,onwrite: [Function: bound onwrite],writecb: null,writelen: 0,bufferedRequest: null,lastBufferedRequest: null,pendingcb: 0,prefinished: false,errorEmitted: false,emitClose: false,bufferedRequestCount: 0,corkedRequestsFree:{ next: null,entry: null,finish: [Function: bound onCorkedFinish] } },writable: false,allowHalfOpen: false,_sockname: null,_writev: null,_pendingData: null,_pendingEncoding: '',server: null,_server: null,isRaw: false,isTTY: true,fd: 0,[Symbol(asyncId)]: 5,[Symbol(lastWriteQueueSize)]: 0,[Symbol(timeout)]: null,[Symbol(kBytesRead)]: 0,[Symbol(kBytesWritten)]: 0 } View Code?
'unpipe' 事件
- src?<stream.Readable>?被移除可寫流管道的來源流。
當(dāng)在可讀流上調(diào)用?stream.unpipe()?時觸發(fā)。
當(dāng)可讀流通過管道流向可寫流發(fā)生錯誤時,也會觸發(fā)?'unpipe'?事件。
var assert = require('assert'); const writer = process.stdout; const reader = process.stdin; writer.on('pipe', (src) => {console.error('有數(shù)據(jù)正通過管道流入寫入器');assert.equal(src,reader);// console.log(src); }); writer.on('unpipe', (src) => {console.error('已移除可寫流管道');assert.equal(src, reader); }); reader.pipe(writer);//觸發(fā)'pipe'事件 reader.unpipe(writer);//觸發(fā)'unpipe'事件返回:
userdeMacBook-Pro:stream-learning user$ node test.js 有數(shù)據(jù)正通過管道流入寫入器 已移除可寫流管道?
?
下面是可使用的方法:
writable.write(chunk[, encoding][, callback])
- chunk?<string>?|?<Buffer>?|?<Uint8Array>?|?<any>?要寫入的數(shù)據(jù)。 ?對于非對象模式的流chunk?必須是字符串、Buffer?或?Uint8Array。 對于對象模式的流,chunk?可以是任何 JavaScript 值,除了?null。
- encoding?<string>?如果?chunk?是字符串,則指定字符編碼。
- callback?<Function>?當(dāng)數(shù)據(jù)塊被輸出到目標(biāo)后的回調(diào)函數(shù)。
- 返回:?<boolean>?如果流需要等待?'drain'?事件觸發(fā)才能繼續(xù)寫入更多數(shù)據(jù),則返回?false,否則返回?true。
writable.write()?寫入數(shù)據(jù)到流,并在數(shù)據(jù)被完全處理之后調(diào)用?callback。 如果發(fā)生錯誤,則?callback?可能被調(diào)用也可能不被調(diào)用。 為了可靠地檢測錯誤,可以為?'error'?事件添加監(jiān)聽器。
在接收了?chunk?后,如果內(nèi)部的緩沖小于創(chuàng)建流時配置的?highWaterMark,則返回?true?。 如果返回?false?,則應(yīng)該停止向流寫入數(shù)據(jù),直到?'drain'?事件被觸發(fā)。
當(dāng)流還未被排空時,調(diào)用?write()?會緩沖?chunk,并返回?false。 一旦所有當(dāng)前緩沖的數(shù)據(jù)塊都被排空了(被操作系統(tǒng)接收并傳輸),則觸發(fā)?'drain'?事件。 建議一旦?write()?返回 false,則不再寫入任何數(shù)據(jù)塊,直到?'drain'?事件被觸發(fā)。 當(dāng)流還未被排空時,也是可以調(diào)用?write(),Node.js 會緩沖所有被寫入的數(shù)據(jù)塊,直到達(dá)到最大內(nèi)存占用,這時它會無條件中止。 甚至在它中止之前, 高內(nèi)存占用將會導(dǎo)致垃圾回收器的性能變差和 RSS 變高(即使內(nèi)存不再需要,通常也不會被釋放回系統(tǒng))。 如果遠(yuǎn)程的另一端沒有讀取數(shù)據(jù),TCP 的 socket 可能永遠(yuǎn)也不會排空,所以寫入到一個不會排空的 socket 可能會導(dǎo)致遠(yuǎn)程可利用的漏洞。?
對于?Transform, 寫入數(shù)據(jù)到一個不會排空的流尤其成問題,因為?Transform?流默認(rèn)會被暫停,直到它們被 pipe 或者添加了?'data'?或?'readable'?事件句柄。?
如果要被寫入的數(shù)據(jù)可以根據(jù)需要生成或者取得,建議將邏輯封裝為一個可讀流并且使用?stream.pipe()。 如果要優(yōu)先調(diào)用?write(),則可以使用?'drain'?事件來防止背壓與避免內(nèi)存問題:
var assert = require('assert'); const writer = process.stdout; // const reader = process.stdin; function write(data, cb) { if (!writer.write(data)) { writer.once('drain', cb); } else { process.nextTick(cb); } } // 在回調(diào)函數(shù)被執(zhí)行后再進(jìn)行其他的寫入。 write('hello', () => { console.log('完成寫入,可以進(jìn)行更多的寫入'); });返回:
node test.js hello完成寫入,可以進(jìn)行更多的寫入?
舉一個例子說明write和drain:
參考https://blog.csdn.net/eeewwwddd/article/details/81042225
- 如果文件不存在會創(chuàng)建,如果有內(nèi)容會被清空
- 讀取到highWaterMark的時候就會輸出
- 第一次是真的寫到文件 后面就是寫入緩存區(qū) 再從緩存區(qū)里面去取
?
let fs = require('fs') let ws = fs.createWriteStream('./foo1.txt',{flags: 'w',encoding: 'utf8',start: 0,//write的highWaterMark只是用來觸發(fā)是不是干了highWaterMark: 19 //寫是默認(rèn)16k,當(dāng)這里設(shè)置的長度小于或者等于我一下子要寫入的字符串長度時,會觸發(fā)一次drain,也僅觸發(fā)一次,然后將剩余部分的所有內(nèi)容放入緩存,后面將不會再觸發(fā)drain了 }) //返回boolean 每當(dāng)write一次都會在ws中吃下一個饅頭 當(dāng)吃下的饅頭數(shù)量達(dá)到highWaterMark時 就會返回false 吃不下了會把其余放入緩存 其余狀態(tài)返回true //write只能放string或者buffer var flag = ws.write('today is a good day','utf8',()=>{console.log('write'); }); ws.on('drain', ()=>{console.log('drain'); });返回:
node test.js drain write如果改為highWaterMark: 20,大于輸入內(nèi)容,則不會觸發(fā)drain
則返回:
?
node test.js write?
?
?
writable.end([chunk][, encoding][, callback])
- chunk?<string>?|?<Buffer>?|?<Uint8Array>?|?<any>?要寫入的數(shù)據(jù)。 對于非對象模式的流chunk?必須是字符串、Buffer、或?Uint8Array。 對于對象模式的流,?chunk?可以是任何 JavaScript 值,除了?null。
- encoding?<string>?如果?chunk?是字符串,則指定字符編碼。
- callback?<Function>?當(dāng)流結(jié)束時的回調(diào)函數(shù)。
- 返回:?<this>
調(diào)用?writable.end()?表明已沒有數(shù)據(jù)要被寫入可寫流。 可選的?chunk?和?encoding?參數(shù)可以在關(guān)閉流之前再寫入一塊數(shù)據(jù)。 如果傳入了?callback?函數(shù),則會做為監(jiān)聽器添加到?'finish'?事件。
調(diào)用?stream.end()?之后再調(diào)用?stream.write()?會導(dǎo)致錯誤
writable.cork()
強(qiáng)制把所有寫入的數(shù)據(jù)都緩沖到內(nèi)存中。 當(dāng)調(diào)用?stream.uncork()?或?stream.end()?時,緩沖的數(shù)據(jù)才會被輸出。
當(dāng)寫入大量小塊數(shù)據(jù)到流時,內(nèi)部緩沖可能失效,從而導(dǎo)致性能下降,writable.cork()?主要用于避免這種情況。 對于這種情況,實現(xiàn)了?writable._writev()?的流可以用更優(yōu)的方式對寫入的數(shù)據(jù)進(jìn)行緩沖。
writable.uncork()
將調(diào)用?stream.cork()?后緩沖的所有數(shù)據(jù)輸出到目標(biāo)。
當(dāng)使用?writable.cork()?和?writable.uncork()?來管理流的寫入緩沖時,建議使用?process.nextTick()?來延遲調(diào)用?writable.uncork()。 通過這種方式,可以對單個 Node.js 事件循環(huán)中調(diào)用的所有?writable.write()?進(jìn)行批處理。
?
擴(kuò)展:?process.nextTick()
process.nextTick(callback[, ...args])
- callback?<Function>
- ...args?<any>?調(diào)用?callback時傳遞給它的額外參數(shù)
process.nextTick()方法將?callback?添加到"next tick 隊列"。 一旦當(dāng)前事件輪詢隊列的任務(wù)全部完成,在next tick隊列中的所有callbacks會被依次調(diào)用。
這種方式不是setTimeout(fn, 0)的別名。它更加有效率。事件輪詢隨后的ticks 調(diào)用,會在任何I/O事件(包括定時器)之前運(yùn)行。
舉例:
console.log('start'); process.nextTick(() => {console.log('nextTick callback'); }); console.log('scheduled'); // Output: // start // scheduled // nextTick callback?
回到writable.uncork(),舉例:
var assert = require('assert'); const writer = process.stdout;writer.cork(); writer.write('一些 '); writer.write('數(shù)據(jù) '); process.nextTick(() => writer.uncork());如果沒有這一句,運(yùn)行時沒有輸出結(jié)果的返回:
node test.js 一些 數(shù)據(jù)如果一個流上多次調(diào)用?writable.cork(),則必須調(diào)用同樣次數(shù)的?writable.uncork()?才能輸出緩沖的數(shù)據(jù)。
var assert = require('assert'); const writer = process.stdout; writer.cork(); writer.write('一些 '); writer.cork(); writer.write('數(shù)據(jù) '); process.nextTick(() => {writer.uncork();// 數(shù)據(jù)不會被輸出,直到第二次調(diào)用 uncork()。writer.uncork();//注釋掉這一句就不會有輸出,正確輸出為一些 數(shù)據(jù) });writable.destroy([error])
- error?<Error>
- 返回:?<this>
銷毀流,并觸發(fā)?'error'?事件且傳入?error?參數(shù)。 調(diào)用該方法后,可寫流就結(jié)束了,之后再調(diào)用?write()?或?end()?都會導(dǎo)致?ERR_STREAM_DESTROYED?錯誤。 實現(xiàn)流時不應(yīng)該重寫這個方法,而是重寫?writable._destroy()
?
writable.setDefaultEncoding(encoding)
- encoding?<string>?默認(rèn)的字符編碼。
- 返回:?<this>
為可寫流設(shè)置默認(rèn)的?encoding。
?
轉(zhuǎn)自https://blog.csdn.net/eeewwwddd/article/details/81042225
let fs = require('fs') let EventEmitter = require('events') //只有第一次write的時候直接用_write寫入文件 其余都是放到cache中 但是len超過了highWaterMark就會返回false告知需要drain 很占緩存 //從第一次的_write開始 回去一直通過clearBuffer遞歸_write寫入文件 如果cache中沒有了要寫入的東西 會根據(jù)needDrain來判斷是否觸發(fā)干點 class WriteStream extends EventEmitter{constructor(path,options = {}){super()this.path = paththis.highWaterMark = options.highWaterMark || 64*1024this.flags = options.flags || 'r'this.start = options.start || 0this.pos = this.startthis.autoClose = options.autoClose || truethis.mode = options.mode || 0o666//默認(rèn)null就是bufferthis.encoding = options.encoding || null//打開這個文件this.open()//寫文件的時候需要哪些參數(shù)//第一次寫入的時候 是給highWaterMark個饅頭 他會硬著頭皮寫到文件中 之后才會把多余吃不下的放到緩存中this.writing = false//緩存數(shù)組this.cache = []this.callbackList = []//數(shù)組長度this.len = 0//是否觸發(fā)drain事件this.needDrain = false}clearBuffer(){//取緩存中最上面的一個let buffer = this.cache.shift()if(buffer){//有buffer的情況下this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)}else{//沒有的話 先看看需不需要drainif(this.needDrain){//觸發(fā)drain 并初始化所有狀態(tài)this.writing = falsethis.needDrain = falsethis.callbackList.shift()()this.emit('drain')}this.callbackList.map(v=>{v()})this.callbackList.length = 0}}_write(chunk,encoding,clearBuffer,callback){//因為write方法是同步調(diào)用的 所以可能還沒獲取到fdif(typeof this.fd != 'number'){//直接在open的時間對象上注冊一個一次性事件 當(dāng)open被emit的時候會被調(diào)用return this.once('open',()=>this._write(chunk,encoding,clearBuffer,callback))}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{this.pos += byteWrite//每次寫完 相應(yīng)減少內(nèi)存中的數(shù)量this.len -= byteWriteif(callback) this.callbackList.push(callback)//第一次寫完 clearBuffer()})}//寫入方法write(chunk,encoding=this.encoding,callback){//判斷chunk必須是字符串或者buffer 為了統(tǒng)一都變成bufferchunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding)//維護(hù)緩存的長度 3this.len += chunk.lengthlet ret = this.len < this.highWaterMarkif(!ret){//表示要觸發(fā)drain事件this.needDrain = true}//正在寫入的應(yīng)該放到內(nèi)存中if(this.writing){this.cache.push({chunk,encoding,callback})}else{//這里是第一次寫的時候this.writing = true//專門實現(xiàn)寫的方法this._write(chunk,encoding,()=>this.clearBuffer(),callback)}// console.log(ret)//能不能繼續(xù)寫了 false代表下次寫的時候更占內(nèi)存return ret}destory(){if(typeof this.fd != 'number'){return this.emit('close')}//如果文件被打開過 就關(guān)閉文件并且觸發(fā)close事件fs.close(this.fd,()=>{this.emit('close')})}open(){//fd表示的就是當(dāng)前this.path的這個文件,從3開始(number類型)fs.open(this.path,this.flags,(err,fd)=>{//有可能fd這個文件不存在 需要做處理if(err){//如果有自動關(guān)閉 則幫他銷毀if(this.autoClose){//銷毀(關(guān)閉文件,出發(fā)關(guān)閉文件事件)this.destory()}//如果有錯誤 就會觸發(fā)error事件this.emit('error',err)return}//保存文件描述符this.fd = fd//當(dāng)文件打開成功時觸發(fā)open事件this.emit('open',this.fd)})} }?
自定義可寫流
因為createWriteStream內(nèi)部調(diào)用了WriteStream類,WriteStream又實現(xiàn)了Writable接口,WriteStream實現(xiàn)了_write()方法,所以我們通過自定義一個類繼承stream模塊的Writable,并在原型上自定義一個_write()就可以自定義自己的可寫流
返回:
node test.js <Buffer 79 65 73> ok?
?
《2》可讀流
可讀流是對提供數(shù)據(jù)的來源的一種抽象。
可讀流的例子包括:
- 客戶端的 HTTP 響應(yīng)
- 服務(wù)器的 HTTP 請求
- fs 的讀取流
- zlib 流
- crypto 流
- TCP socket
- 子進(jìn)程 stdout 與 stderr
- process.stdin
所有可讀流都實現(xiàn)了?stream.Readable?類定義的接口。
?
兩種讀取模式
可讀流運(yùn)作于兩種模式之一:流動模式(flowing)或暫停模式(paused)。
- 在流動模式中,數(shù)據(jù)自動從底層系統(tǒng)讀取,并通過?EventEmitter?接口的事件盡可能快地被提供給應(yīng)用程序。
- 在暫停模式中,必須顯式調(diào)用?stream.read()?讀取數(shù)據(jù)塊。
所有可讀流都開始于暫停模式,可以通過以下方式切換到流動模式:
- 添加?'data'?事件句柄。
- 調(diào)用?stream.resume()。
- 調(diào)用?stream.pipe()。
可讀流可以通過以下方式切換回暫停模式:
- 如果沒有管道目標(biāo),則調(diào)用?stream.pause()。
- 如果有管道目標(biāo),則移除所有管道目標(biāo)。調(diào)用?stream.unpipe()?可以移除多個管道目標(biāo)。
只有提供了消費(fèi)或忽略數(shù)據(jù)的機(jī)制后,可讀流才會產(chǎn)生數(shù)據(jù)。 如果消費(fèi)的機(jī)制被禁用或移除,則可讀流會停止產(chǎn)生數(shù)據(jù)。
為了向后兼容,移除?'data'?事件句柄不會自動地暫停流。 如果有管道目標(biāo),一旦目標(biāo)變?yōu)?span id="ozvdkddzhkzd" class="Apple-converted-space">?drain?狀態(tài)并請求接收數(shù)據(jù)時,則調(diào)用?stream.pause()?也不能保證流會保持暫停模式。
如果可讀流切換到流動模式,且沒有可用的消費(fèi)者來處理數(shù)據(jù),則數(shù)據(jù)將會丟失。 例如,當(dāng)調(diào)用?readable.resume()?時,沒有監(jiān)聽?'data'?事件或?'data'?事件句柄已移除。
添加?'readable'?事件句柄會使流自動停止流動,并通過?readable.read()?消費(fèi)數(shù)據(jù)。 如果?'readable'?事件句柄被移除,且存在?'data'?事件句柄,則流會再次開始流動。
?
三種狀態(tài)
可讀流的兩種模式是對發(fā)生在可讀流中更加復(fù)雜的內(nèi)部狀態(tài)管理的一種簡化的抽象。
在任意時刻,可讀流會處于以下三種狀態(tài)之一:
- readable.readableFlowing === null
- readable.readableFlowing === false
- readable.readableFlowing === true
當(dāng)?readable.readableFlowing?為?null?時,沒有提供消費(fèi)流數(shù)據(jù)的機(jī)制,所以流不會產(chǎn)生數(shù)據(jù)。 在這個狀態(tài)下,監(jiān)聽?'data'?事件、調(diào)用?readable.pipe()、或調(diào)用?readable.resume()?都會使?readable.readableFlowing?切換到?true,可讀流開始主動地產(chǎn)生數(shù)據(jù)并觸發(fā)事件。
調(diào)用?readable.pause()、readable.unpipe()、或接收到背壓,則?readable.readableFlowing?會被設(shè)為?false,暫時停止事件流動但不會停止數(shù)據(jù)的生成。 在這個狀態(tài)下,為?'data'?事件綁定監(jiān)聽器不會使?readable.readableFlowing?切換到?true。
const { PassThrough, Writable } = require('stream'); const pass = new PassThrough(); const writable = new Writable(); pass.pipe(writable); pass.unpipe(writable); // readableFlowing 現(xiàn)在為 false。 pass.on('data', (chunk) => { console.log(chunk.toString()); }); pass.write('ok'); // 不會觸發(fā) 'data' 事件。 pass.resume(); // 必須調(diào)用它才會觸發(fā) 'data' 事件。如果注釋掉它則不會返回結(jié)果ok當(dāng)?readable.readableFlowing?為?false?時,數(shù)據(jù)可能會堆積在流的內(nèi)部緩沖中。
?
選擇一種接口風(fēng)格
可讀流的 API 貫穿了多個 Node.js 版本,且提供了多種方法來消費(fèi)流數(shù)據(jù)。 ??開發(fā)者通常應(yīng)該選擇其中一種方法來消費(fèi)數(shù)據(jù),不要在單個流使用多種方法來消費(fèi)數(shù)據(jù)。 混合使用?on('data')、on('readable')、pipe()?或異步迭代器,會導(dǎo)致不明確的行為。
對于大多數(shù)用戶,建議使用?readable.pipe(),因為它是消費(fèi)流數(shù)據(jù)最簡單的方式。 如果開發(fā)者需要精細(xì)地控制數(shù)據(jù)的傳遞與產(chǎn)生,可以使用?EventEmitter、readable.on('readable')/readable.read()?或?readable.pause()/readable.resume()。
?
stream.Readable 類
下面是事件的介紹:
'error' 事件
- <Error>
當(dāng)流因底層內(nèi)部出錯而不能產(chǎn)生數(shù)據(jù)、或推送無效的數(shù)據(jù)塊時觸發(fā)。
'close' 事件
當(dāng)流或其底層資源(比如文件描述符)被關(guān)閉時觸發(fā)。 表明不會再觸發(fā)其他事件,也不會再發(fā)生操作。
不是所有可讀流都會觸發(fā)?'close'?事件。
'data' 事件
- chunk?<Buffer>?|?<string>?|?<any>?數(shù)據(jù)塊。 對于非對象模式的流,?chunk?可以是字符串或?Buffer。 對于對象模式的流,chunk?可以是任何 JavaScript 值,除了?null。
當(dāng)流將數(shù)據(jù)塊傳送給消費(fèi)者后觸發(fā)。 當(dāng)調(diào)用?readable.pipe(),readable.resume()?或綁定監(jiān)聽器到?'data'?事件時,流會轉(zhuǎn)換到流動模式。 當(dāng)調(diào)用?readable.read()?且有數(shù)據(jù)塊返回時,也會觸發(fā)?'data'?事件。
如果使用?readable.setEncoding()?為流指定了默認(rèn)的字符編碼,則監(jiān)聽器回調(diào)傳入的數(shù)據(jù)為字符串,否則傳入的數(shù)據(jù)為?Buffer。
const fs = require('fs'); const rr = fs.createReadStream('data.txt');//hello data rr.on('data', (chunk) => {//readable不行,報錯TypeError: Cannot read property 'length' of undefined console.log(`接收到 ${chunk.length} 個字節(jié)的數(shù)據(jù)`); //chunk為undefined });返回:
node test.js 接收到 10 個字節(jié)的數(shù)據(jù)?
process.stdin.setEncoding('utf8'); process.stdin.on('data', (chunk) => {//readable不行,會閃退??????? console.log(`接收到 ${chunk.length} 個字節(jié)的數(shù)據(jù)`); });返回:
node test.js 今天天氣好 //自己輸入并回車,這個內(nèi)容就會被process.stdin收到 接收到 6 個字節(jié)的數(shù)據(jù)?
之前有試一個例子一直沒有成功:
process.stdin.setEncoding('utf8'); // process.stdout.write("請輸入用戶名:"); process.stdin.on('data', (chunk) => { // var chunk = process.stdin.read(); console.log(chunk); if (chunk !== null) { process.stdout.write(`data: ${chunk}`); } }); process.stdin.on('end', () => { process.stdout.write('end'); });?
'end' 事件
當(dāng)流中沒有數(shù)據(jù)可供消費(fèi)時觸發(fā)。
'end'?事件只有在數(shù)據(jù)被完全消費(fèi)掉后才會觸發(fā)。 要想觸發(fā)該事件,可以將流轉(zhuǎn)換到流動模式,或反復(fù)調(diào)用?stream.read()?直到數(shù)據(jù)被消費(fèi)完。
'readable' 事件
當(dāng)流中有數(shù)據(jù)可供讀取時觸發(fā)
當(dāng)?shù)竭_(dá)流數(shù)據(jù)尾部時,?'readable'?事件也會觸發(fā)。觸發(fā)順序在?'end'?事件之前。
事實上,?'readable'?事件表明流有了新的動態(tài):要么是有了新的數(shù)據(jù),要么是到了流的尾部。 對于前者,?stream.read()?將返回可用的數(shù)據(jù)。而對于后者,?stream.read()?將返回?null。 例如,下面的例子中的?foo.txt?是一個空文件:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt'); rr.on('readable', () => { console.log(`讀取的數(shù)據(jù): ${rr.read()}`); }); rr.on('end', () => { console.log('結(jié)束'); });返回:
node test.js 讀取的數(shù)據(jù): null 結(jié)束?
有問題:
const fs = require('fs'); const rr = fs.createReadStream('data.txt'); rr.on('readable', function(){//不能是'data'事件,為什么,如果是data,返回只有null和end,明天好好查查這兩者的對比 var chunk = rr.read(); // 獲取到輸入的信息 console.log(chunk); if(chunk === ''){ rr.emit('end'); // 觸發(fā)end事件 return } if (chunk !== null) { process.stdout.write('data: '+ chunk +'\n'); } // rr.emit('end'); }); rr.on('end', function() { process.stdout.write('end'+'\n'); //也輸出了,只是被擋住了,加上+'\n'就看出來了 });返回:
node test.js <Buffer 68 65 6c 6c 6f 20 64 61 74 61> data: hello data null end?上面標(biāo)明的錯誤都是因為一開始沒能弄清楚data和readable的區(qū)別,看了博客https://blog.csdn.net/eeewwwddd/article/details/81042225?utm_source=copy后終于明白
參數(shù)
path:讀取的文件的路徑
?
data與readable的區(qū)別:
- readable和讀流的data的區(qū)別就是,readable可以控制自己從緩存區(qū)讀多少和控制讀的次數(shù),而data是每次讀取都清空緩存,讀多少輸出多少
- readable是暫停模式,data是流動模式;就是readable需要使用read()來讀取數(shù)據(jù),data則是從回調(diào)中就能夠得到數(shù)據(jù)
//因為上面的data事件把數(shù)據(jù)讀了,清空緩存區(qū)。所以導(dǎo)致下面的readable讀出為null rs.on('readable',() => {console.log('readable');console.log(rs.read()); });
返回:
node test.jsdata
he
data
ll
data
o
readable
null
如果把'data'監(jiān)聽去掉,那么返回結(jié)果就是:
node test.js readable he readable ll readable o readable null?
舉例說明readable的使用情況:
(1)
let rs = fs.createReadStream('./foo.txt', {//內(nèi)容為 Today is a good day.i want to go out for fun.//每次讀7個highWaterMark: 7,encoding: 'utf8' }) //如果讀流第一次全部讀下來并且小于highWaterMark,就會再讀一次(再觸發(fā)一次readable事件) rs.on('readable', () => {let result = rs.read(2);console.log(result) })返回:
node test.js To da(2)
//如果rs.read()不加參數(shù),一次性讀完,會從緩存區(qū)再讀一次,為null rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js Today i s a goo d day.iwant t o go ou t for f un. null(3)
//如果readable每次都剛好讀完(即rs.read()的參數(shù)剛好和highWaterMark相等),就會一直觸發(fā)readable事件,如果最后不足他想喝的數(shù),他就會先觸發(fā)一次null,最后把剩下的喝完 rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js Today i s a goo d day.iwant t o go ou t for f null un.(4)
//一開始緩存區(qū)為0的時候也會默認(rèn)調(diào)一次readable事件,將foo.txt內(nèi)容清零 rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js null?
實戰(zhàn):行讀取器(平常我們的文件可能有回車、換行,此時如果要每次想讀一行的數(shù)據(jù),就得用到readable)
let EventEmitter = require('events') //如果要將內(nèi)容全部讀出就用on('data'),精確讀取就用on('readable') class LineReader extends EventEmitter {constructor(path) {super()this.rs = fs.createReadStream(path)//回車符的十六進(jìn)制let RETURN = 0x0d//換行符的十六進(jìn)制let LINE = 0x0alet arr = []this.on('newListener', (type) => {//每次使用 on 監(jiān)聽事件時觸發(fā)'newListener'事件if (type === 'newLine') {//自定義的一個事件'newLine',觸發(fā)后就調(diào)用'readable',然后自行設(shè)定一次讀取一行的操作this.rs.on('readable', () => {let char//每次讀一個,當(dāng)讀完的時候會返回null,終止循環(huán)while (char = this.rs.read(1)) {//讀到文件最后char = nullswitch (char[0]) {case RETURN:break;//Mac下只有換行符,windows下是回車符和換行符,需要根據(jù)不同的轉(zhuǎn)換。因為我這里是Maccase LINE://如果是換行符就把數(shù)組轉(zhuǎn)換為字符串let r = Buffer.from(arr).toString('utf8')//把數(shù)組清空arr.length = 0//觸發(fā)newLine事件,把得到的一行數(shù)據(jù)輸出this.emit('newLine', r)break;default://如果不是換行符,就放入數(shù)組中arr.push(char[0])}}})}})//以上只能取出換行符之前的代碼,最后一行的后面沒有換行符,所以需要特殊處理。當(dāng)讀流讀完需要觸發(fā)end事件時this.rs.on('end', () => {//取出最后一行數(shù)據(jù),轉(zhuǎn)成字符串let r = Buffer.from(arr).toString('utf8')arr.length = 0this.emit('newLine', r)})} }let lineReader = new LineReader('./foo.txt') lineReader.on('newLine', function (data) {console.log('a line');console.log(data); })返回:
node test.js //可見一次是只讀取一行的 a line if the truth is : a line I a line Am a line A a line boy一般是將整個文件讀取完的:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('readable', () => {console.log('one time');console.log(rr.read()); }); rr.on('end', () => {console.log('結(jié)束'); });返回:
node test.js one time if the truth is : I Am A boy one time null 結(jié)束?
下面接著方法的介紹:
readable.destroy([error])
銷毀流,并且觸發(fā)error事件。然后,可讀流將釋放所有的內(nèi)部資源。
開發(fā)者不應(yīng)該覆蓋這個方法,應(yīng)該覆蓋readable._destroy方法。
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('open', function () {console.log('文件被打開'); }); rr.destroy('something wrong');//有參數(shù)則為出現(xiàn)的錯誤,會觸發(fā)error事件 rr.on('data', function (data) {console.log('data');console.log(data);}); rr.on('error', function (err) {console.log('error');console.log(err); }); rr.on('close', function (err) {console.log('close'); }); rr.on('end', () => {console.log('end'); });返回:
node test.js 文件被打開 error something wrong如果rr.destroy();參數(shù)為空,則不會觸發(fā)error事件,而是觸發(fā)close事件,那么返回為:
?
node test.js 文件被打開 closereadable.isPaused()
- 返回:?<boolean>
readable.isPaused()?方法返回可讀流的當(dāng)前操作狀態(tài)。 該方法主要是在?readable.pipe()?方法的底層機(jī)制中用到。大多數(shù)情況下,沒有必要直接使用該方法
readable.pause()
- 返回:?this
readable.pause()?方法將會使 flowing 模式的流停止觸發(fā)?'data'?事件, 進(jìn)而切出 flowing 模式。任何可用的數(shù)據(jù)都將保存在內(nèi)部緩存中。
?
readable.read([size])
- size?<number>?可選參數(shù),確定讀取數(shù)據(jù)的大小.
- 返回?<string>?|?<Buffer>?|?<null>
readable.read()方法從內(nèi)部緩沖區(qū)中抽出并返回一些數(shù)據(jù)。 如果沒有可讀的數(shù)據(jù),返回null。readable.read()方法默認(rèn)數(shù)據(jù)將作為“Buffer”對象返回 ,除非已經(jīng)使用readable.setEncoding()方法設(shè)置編碼或流運(yùn)行在對象模式。
可選的size參數(shù)指定要讀取的特定數(shù)量的字節(jié)。如果size字節(jié)不可讀,將返回null除非流已經(jīng)結(jié)束,在這種情況下所有保留在內(nèi)部緩沖區(qū)的數(shù)據(jù)將被返回。
如果沒有指定size參數(shù),則內(nèi)部緩沖區(qū)包含的所有數(shù)據(jù)將返回。
readable.read()方法只應(yīng)該在暫停模式下的可讀流上運(yùn)行。在流模式下,readable.read()自動調(diào)用直到內(nèi)部緩沖區(qū)的數(shù)據(jù)完全耗盡。
一般來說,建議開發(fā)人員避免使用'readable'事件和readable.read()方法,使用readable.pipe()或'data'事件代替。
無論size參數(shù)的值是什么,對象模式中的可讀流將始終返回調(diào)用readable.read(size)的單個項目。
注意:如果readable.read()方法返回一個數(shù)據(jù)塊,那么一個'data'事件也將被發(fā)送。
注意:在已經(jīng)被發(fā)出的'end'事件后調(diào)用stream.read([size])事件將返回null。不會拋出運(yùn)行時錯誤。
?
//fd文件描述符,一般通過fs.open中獲取 //buffer是讀取后的數(shù)據(jù)放入的緩存目標(biāo) //0,從buffer的0位置開始放入 //BUFFER_SIZE,每次放BUFFER_SIZE這么長的長度 //index,每次從文件的index的位置開始讀 //bytesRead,真實讀到的個數(shù) fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){})?
?
?
?
readable.resume()
- 返回:?this
readable.resume()?方法會重新觸發(fā)?'data'?事件, 將暫停模式切換到流動模式。
readable.resume()?方法可以用來充分使用流中的數(shù)據(jù),而不用實際處理任何數(shù)據(jù),如以下示例所示:
getReadableStreamSomehow().resume().on('end', () => {console.log('Reached the end, but did not read anything.');});readable.setEncoding(encoding)
- encoding?<string>?要使用的編碼
- Returns:?this
readble.setEncoding()?方法會為從可讀流讀入的數(shù)據(jù)設(shè)置字符編碼
默認(rèn)返回Buffer對象。設(shè)置編碼會使得該流數(shù)據(jù)返回指定編碼的字符串而不是Buffer對象。例如,調(diào)用readable.setEncoding('utf8')會使得輸出數(shù)據(jù)作為UTF-8數(shù)據(jù)解析,并作為字符串返回。調(diào)用readable.setEncoding('hex')使得數(shù)據(jù)被編碼成16進(jìn)制字符串格式。
可讀流會妥善處理多字節(jié)字符,如果僅僅直接從流中取出Buffer對象,很可能會導(dǎo)致錯誤解碼。
?
?
舉例說明上面的事件和方法的使用:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('open', function () {//1 先響應(yīng)openconsole.log('文件被打開'); }); rr.on('data', function (data) {//2 console.log('data');console.log(rr.isPaused()); //falserr.pause();//3 改為暫停模式,不讀取數(shù)據(jù)了console.log(rr.isPaused());//true console.log(data);}); setTimeout(function () {//7 兩秒后恢復(fù)成流動模式繼續(xù)讀取數(shù)據(jù)console.log('resume');console.log(rr.isPaused());//true rr.resume();console.log(rr.isPaused());//true,因為添加 'readable' 事件句柄會使流自動停止流動,并通過 readable.read() 消費(fèi)數(shù)據(jù)。 如果 'readable' 事件句柄被移除,且存在 'data' 事件句柄,則流會再次開始流動 },1000); //注釋掉readable后,結(jié)果就為false rr.on('error', function (err) {console.log(err); }); rr.on('readable', () => {//4 因為data將所有數(shù)據(jù)都讀完并將緩存清空,所以readable只輸出nullconsole.log('readable');console.log(rr.read()); }); rr.on('close', function (err) {//6 關(guān)閉console.log('close'); }); rr.on('end', () => {//5 結(jié)束console.log('end'); });返回:
node test.js 文件被打開 data false true if the truth is : I Am A boy readable null end close resume true true注釋掉readable返回:
node test.js 文件被打開 data false true if the truth is : I Am A boy resume true false end close?
readable.pipe(destination[, options])
- destination?<stream.Writable>?數(shù)據(jù)寫入目標(biāo)
-
options?<Object>?Pipe 選項
- end?<boolean>?在 reader 結(jié)束時結(jié)束 writer 。默認(rèn)為?true。
readable.pipe()?綁定一個 [Writable][] 到?readable?上, 將可寫流自動切換到 flowing 模式并將所有數(shù)據(jù)傳給綁定的 [Writable][]。數(shù)據(jù)流將被自動管理。這樣,即使是可讀流較快,目標(biāo)可寫流也不會超負(fù)荷(overwhelmed)。
下面例子將?readable?中的所有數(shù)據(jù)通過管道傳遞給名為?foo.txt?的文件:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.pipe(process.stdout);返回:
node test.js if the truth is : I Am A boy可以在單個可讀流上綁定多個可寫流。
readable.pipe()?方法返回?目標(biāo)流?的引用,這樣就可以對流進(jìn)行鏈?zhǔn)降毓艿啦僮?#xff1a;
const fs = require('fs'); const zlib = require('zlib'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); const z = zlib.createGzip(); const w = fs.createWriteStream('foo.txt.gz'); rr.pipe(z).pipe(w); //運(yùn)行后,文件夾中果然出現(xiàn)了一個壓縮文件默認(rèn)情況下,當(dāng)源可讀流(the source Readable stream)觸發(fā)?'end'?事件時,目標(biāo)流也會調(diào)用?stream.end()?方法從而結(jié)束寫入。要禁用這一默認(rèn)行為,?end?選項應(yīng)該指定為?false, 這將使目標(biāo)流保持打開, 如下面例子所示:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); const writer = fs.createWriteStream('foo2.txt'); rr.pipe(writer,{end:false}); rr.on('end', () => {console.log('end reader'); }); setTimeout(function(){writer.write('請輸入num1的值:');writer.end(); },2000);返回:
node test.jsend reader
且foo2.txt文件中內(nèi)容為:
if the truth is : I Am A boy請輸入num1的值:如果去掉{ end: false },則出錯:
node test.js end reader events.js:167throw er; // Unhandled 'error' event^Error [ERR_STREAM_WRITE_AFTER_END]: write after end //這就是因為當(dāng)源可讀流觸發(fā)?'end'?事件時,目標(biāo)流也會調(diào)用?stream.end()?方法從而結(jié)束寫入這里有一點要警惕,如果可讀流在處理時發(fā)生錯誤,目標(biāo)可寫流?不會?自動關(guān)閉。 如果發(fā)生錯誤,需要?手動?關(guān)閉所有流以避免內(nèi)存泄漏。
注意:不管對?process.stderr?和?process.stdout?指定什么選項,它們都是直到 Node.js 進(jìn)程退出才關(guān)閉。
?
readable.unpipe([destination])
- destination?<stream.Writable>?可選的,指定需要分離的目標(biāo)流
readable.unpipe()?方法將之前通過stream.pipe()方法綁定的流分離
如果?destination?沒有傳入, 則所有綁定的流都會被分離.
如果傳入?destination, 但它沒有被pipe()綁定過,則該方法不作為.
const readable = getReadableStreamSomehow(); const writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second readable.pipe(writable); setTimeout(() => {console.log('Stop writing to file.txt');readable.unpipe(writable);console.log('Manually close the file stream');writable.end(); }, 1000);?
readable源碼實現(xiàn),轉(zhuǎn)自https://blog.csdn.net/eeewwwddd/article/details/81042225
let fs = require('fs') let EventEmitter = require('events') class ReadStream extends EventEmitter{constructor(path,options = {}){super()this.path = paththis.highWaterMark = options.highWaterMark || 64*1024this.flags = options.flags || 'r'this.start = options.start || 0this.pos = this.start //會隨著讀取的位置改變this.autoClose = options.autoClose || truethis.end = options.end || null//默認(rèn)null就是bufferthis.encoding = options.encoding || null//參數(shù)的問題this.reading = false //非流動模式//創(chuàng)建個buffer用來存儲每次讀出來的數(shù)據(jù)this.buffers = []//緩存區(qū)長度this.len = 0//是否要觸發(fā)readable事件this.emittedReadable = false//觸發(fā)open獲取文件的fd標(biāo)識符this.open()//此方法默認(rèn)同步調(diào)用 每次設(shè)置on監(jiān)聽事件時都會調(diào)用之前所有的newListener事件this.on('newListener',(type)=>{// 等待著他監(jiān)聽data事件if(type === 'readable'){//開始讀取 客戶已經(jīng)監(jiān)聽的data事件this.read()}})}//readable真正的源碼中的方法,計算出和n最接近的2的冪次數(shù) computeNewHighWaterMark(n) {n--;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;n++;return n;}read(n){//當(dāng)讀的數(shù)量大于水平線,會通過取2的冪次取比他大和最接近的數(shù)if(this.len < n){this.highWaterMark = this.computeNewHighWaterMark(n)//重新觸發(fā)readbale的callback,所以第一次會觸發(fā)nullthis.emittedReadable = true//重新讀新的水位線this._read()}//真正讀取到的let buffer = null//說明緩存里有這么多,取出來if(n>0 && n<=this.len){//定義一個bufferbuffer = Buffer.alloc(n)let buflet flag = truelet index = 0//[buffer<1,2,3,4>,buffer<1,2,3,4>,buffer<1,2,3,4>]//每次取出緩存前的第一個bufferwhile(flag && (buf = this.buffers.shift())){for(let i=0;i<buf.length;i++){//把取出的一個buffer中的數(shù)據(jù)放入新定義的buffer中buffer[index++] = buf[i]//當(dāng)buffer的長度和n(參數(shù))長度一樣時,停止循環(huán)if(index === n){flag = false//維護(hù)緩存,因為可能緩存中的buffer長度大于n,當(dāng)取出n的長度時,還會剩下其余的buffer,我們需要切割buf并且放到緩存數(shù)組之前this.len -= nlet r = buf.slice(i+1)if(r.length){this.buffers.unshift(r)}break}}}}//如果緩存區(qū)沒有東西,等會讀完需要觸發(fā)readable事件//這里會有一種狀況,就是如果每次Readable讀取的數(shù)量正好等于highWaterMark(流讀取到緩存的長度),就會每次都等于0,每次都觸發(fā)Readable事件,就會每次讀,讀到?jīng)]有為止,最后還會觸發(fā)一下nullif(this.len === 0){this.emittedReadable = true}if(this.len < this.highWaterMark){//默認(rèn),一開始的時候開始讀取if(!this.reading){this.reading = true//真正多讀取操作this._read()}}return buffer&&buffer.toString()}_read(){if(typeof this.fd != 'number'){//等待著觸發(fā)open事件后fd肯定拿到了 再去執(zhí)行read方法return this.once('open',()=>{this._read()})}//先讀這么多bufferlet buffer = Buffer.alloc(this.highWaterMark)fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{if(byteRead > 0){//當(dāng)?shù)谝淮巫x到數(shù)據(jù)后,改變reading的狀態(tài),如果觸發(fā)read事件,可能還會在觸發(fā)第二次_readthis.reading = false//每次讀到數(shù)據(jù)增加緩存取得長度this.len += byteRead//每次讀取之后,會增加讀取的文件的讀取開始位置this.pos += byteRead//將讀到的buffer放入緩存區(qū)buffers中this.buffers.push(buffer.slice(0,byteRead))//觸發(fā)readableif(this.emittedReadable){this.emittedReadable = false//可以讀取了,默認(rèn)開始的時候杯子填滿了this.emit('readable')}}else{//沒讀到就出發(fā)end事件this.emit('end')}})}destory(){if(typeof this.fd != 'number'){return this.emit('close')}//如果文件被打開過 就關(guān)閉文件并且觸發(fā)close事件fs.close(this.fd,()=>{this.emit('close')})}open(){//fd表示的就是當(dāng)前this.path的這個文件,從3開始(number類型)fs.open(this.path,this.flags,(err,fd)=>{//有可能fd這個文件不存在 需要做處理if(err){//如果有自動關(guān)閉 則幫他銷毀if(this.autoClose){//銷毀(關(guān)閉文件,觸發(fā)關(guān)閉文件事件)this.destory()}//如果有錯誤 就會觸發(fā)error事件this.emit('error',err)return}//保存文件描述符this.fd = fd//當(dāng)文件打開成功時觸發(fā)open事件this.emit('open',this.fd)})} }?
自定義可讀流
因為createReadStream內(nèi)部調(diào)用了ReadStream類,ReadStream又實現(xiàn)了Readable接口,ReadStream實現(xiàn)了_read()方法,所以我們通過自定義一個類繼承stream模塊的Readable,并在原型上自定義一個_read()就可以自定義自己的可讀流
返回:
node test.js 100?
?
pipe——管道 可以控制速率,因為讀快寫慢
let fs = require('fs') //pipe方法叫管道 可以控制速率 let rs = fs.createReadStream('./foo.txt',{highWaterMark: 4 }) let ws = fs.createWriteStream('./foo1.txt',{highWaterMark: 1 }) //會監(jiān)聽rs的on('data')將讀取到的數(shù)據(jù),通過ws.write的方法寫入文件 //調(diào)用寫的一個方法 返回boolean類型 //如果返回false就調(diào)用rs的pause方法 暫停讀取 //等待可寫流 寫入完畢在監(jiān)聽drain resume rs rs.pipe(ws) //會控制速率 防止淹沒可用內(nèi)存?
let fs = require('fs') //這兩個是上面自己寫的ReadStream和WriteStream let { Readable } = require('stream');class MyRead extends Readable{//流需要一個_read方法,方法中push什么,外面就接收什么 _read(){//push方法就是上面_read方法中的push一樣,把數(shù)據(jù)放入緩存區(qū)中this.push('100');//如果push了null就表示沒有東西可讀了,停止(如果不寫,就會一直push上面的值,死循環(huán))this.push(null);} }let writer = fs.createWriteStream('./foo1.txt',{highWaterMark: 1 });//如果用原來的讀寫,因為寫比較耗時,所以會多讀少寫,耗內(nèi)存 MyRead.prototype.pipe = function(dest){this.on('data',(data)=>{let flag = dest.write(data)//如果寫入的時候嘴巴吃滿了就不繼續(xù)讀了,暫停if(!flag){this.pause()}});//如果寫的時候嘴巴里的吃完了,就會繼續(xù)讀dest.on('drain',()=>{this.resume()});this.on('end',()=>{this.destroy()//銷毀ReadStream//清空緩存中的數(shù)據(jù)fs.fsync(1,()=>{//fs.fsync作用是同步磁盤緩存,1代表的是文件描述符,0,1,2 文件描述符代表標(biāo)準(zhǔn)輸入設(shè)備(比如鍵盤),標(biāo)準(zhǔn)輸出設(shè)備(顯示器)和標(biāo)準(zhǔn)錯誤dest.destroy()//銷毀WriteStream,之前dest設(shè)的是,但是報錯process.stdout cannot be closed });}); } var reader = new MyRead(); reader.pipe(writer);//結(jié)果就是將100寫到了文件foo1.txt中上面的文件描述符處本來寫的是dest.fd,但是報錯:
TypeError [ERR_INVALID_ARG_TYPE]: The "fd" argument must be of type number. Received type object
查看writer的fd為null,不知原因,待查明???????????
?
stream.pipeline(...streams[, callback])
- ...streams?<Stream>?兩個或多個要用管道連接的流
- callback?<Function>?一個回調(diào)函數(shù),可以帶有一個錯誤信息參數(shù)
該模塊方法用于在多個流之間架設(shè)管道,可以自動傳遞錯誤和完成掃尾工作,并且可在管道架設(shè)完成時提供一個回調(diào)函數(shù):
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib');// 使用 pipeline API 輕松連接多個流 // 并在管道完成時獲得通知// 使用pipeline高效壓縮一個可能很大的tar文件: pipeline(fs.createReadStream('foo.txt'),zlib.createGzip(),fs.createWriteStream('foo.tar.gz'),//運(yùn)行后成功壓縮并返回 管道架設(shè)成功 信息(err) => {if (err) {console.error('管道架設(shè)失敗', err);} else {console.log('管道架設(shè)成功');}} );pipeline?API 也可做成承諾:
const util = require('util'); const stream = require('stream'); const fs = require('fs'); const zlib = require('zlib'); const pipeline = util.promisify(stream.pipeline);async function run() {await pipeline(fs.createReadStream('foo.txt'),zlib.createGzip(),fs.createWriteStream('foo.tar.gz')////運(yùn)行后成功壓縮并返回 管道架設(shè)成功 信息 );console.log('管道架設(shè)成功'); }run().catch(console.error);?
?
用于實現(xiàn)流的 API
其實就是覆寫下面的這些方法來實現(xiàn)自己的流操作:
新的流類必須實現(xiàn)一個或多個特定的方法,根據(jù)所創(chuàng)建的流類型,如下圖所示:
| 只讀流 | Readable | _read |
| 只寫流 | writable | _write?,_writev,_final |
| 可讀可寫流 | Duplex | _read?,_write?,_writev,_final |
| 操作寫數(shù)據(jù),然后讀結(jié)果 | Transform | _transform,_flush,_final |
注意:實現(xiàn)流的代碼里面不應(yīng)該出現(xiàn)調(diào)用“public”方法的地方因為這些方法是給使用者使用的(流使用者部分的API所述)。這樣做可能會導(dǎo)致使用流的應(yīng)用程序代碼產(chǎn)生不利的副作用。
const { Writable } = require('stream');class MyWritable extends Writable {constructor(options) {super(options);// ... } }?
?
雙工流
有了雙工流,我們可以在同一個對象上同時實現(xiàn)可讀和可寫,就好像同時繼承這兩個接口。 重要的是雙工流的可讀性和可寫性操作完全獨(dú)立于彼此。這僅僅是將兩個特性組合成一個對象。
let { Duplex } = require('stream') //雙工流,可讀可寫 class MyDuplex extends Duplex{_read(){this.push('hello Duplex')this.push(null)}_write(chunk,encoding,clearBuffer){console.log(chunk)clearBuffer()} }let myDuplex = new MyDuplex() //process.stdin是node自帶的process進(jìn)程中的可讀流,會監(jiān)聽命令行的輸入 //process.stdout是node自帶的process進(jìn)程中的可寫流,會監(jiān)聽并輸出在命令行中 //所以這里的意思就是在命令行先輸出hello,然后我們輸入什么他就出來對應(yīng)的buffer(先作為可讀流出來) process.stdin.pipe(myDuplex).pipe(process.stdout)返回:
node test.js hello Duplex?
?
轉(zhuǎn)換流
在讀寫過程中可以修改或轉(zhuǎn)換數(shù)據(jù)的?Duplex?流(例如?zlib.createDeflate())
轉(zhuǎn)換流的輸出是從輸入中計算出來的。對于轉(zhuǎn)換流,我們不必實現(xiàn)read或write的方法,我們只需要實現(xiàn)一個transform方法,將兩者結(jié)合起來。它有write方法的意思,我們也可以用它來push數(shù)據(jù)。
let { Transform } = require('stream');class MyTransform extends Transform{_transform(chunk,encoding,callback){//5 myTransform2 push時則觸發(fā)myTransform的_transformconsole.log(chunk.toString().toUpperCase());//6 然后輸出from MyTransform2的大寫內(nèi)容callback();} } let myTransform = new MyTransform();class MyTransform2 extends Transform{_transform(chunk,encoding,callback){//2 觸發(fā)myTransform2的_transformconsole.log(chunk.toString().toUpperCase());//3 輸出input的大寫內(nèi)容INPUTthis.push('from MyTransform2');//4 將from MyTransform2內(nèi)容寫入myTransformthis.push(null);callback();} } let myTransform2 = new MyTransform2();//此時myTransform2被作為可寫流觸發(fā)_transform,輸出輸入的大寫字符后,會通過可讀流push字符到下一個轉(zhuǎn)換流中 //當(dāng)寫入的時候才會觸發(fā)transform的值,此時才會push,所以后面的pipe拿到的chunk是前面的push的值 process.stdin.pipe(myTransform2).pipe(myTransform);返回:
node test.js input //1 輸入回車 INPUTFROM MYTRANSFORM2?
總結(jié)
可讀流
在 flowing 模式下, 可讀流自動從系統(tǒng)底層讀取數(shù)據(jù),并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應(yīng)用。
在 paused 模式下,必須顯式調(diào)用 stream.read() 方法來從流中讀取數(shù)據(jù)片段。
所有初始工作模式為 paused 的 Readable 流,可以通過下面三種途徑切換到 flowing 模式:
- 監(jiān)聽 ‘data’ 事件
- 調(diào)用 stream.resume() 方法
- 調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable
可讀流可以通過下面途徑切換到 paused 模式:
- 如果不存在管道目標(biāo)(pipe destination),可以通過調(diào)用 stream.pause() 方法實現(xiàn)。
- 如果存在管道目標(biāo),可以通過取消 ‘data’ 事件監(jiān)聽,并調(diào)用 stream.unpipe() 方法移除所有管道目標(biāo)來實現(xiàn)。
可寫流
需要知道只有在嘴真正的吃滿了,并且等到把嘴里的和地上的饅頭(緩存中的)都吃下了才會觸發(fā)drain事件
第一次寫入會直接寫入文件中,后面會從緩存中一個個取
雙工流
只是對可寫可讀流的一種應(yīng)用,既可作為可讀流,也能作為可寫流,并且作為可讀或者可寫時是隔離的
轉(zhuǎn)換流
一般轉(zhuǎn)換流是邊輸入邊輸出的,而且一般只有觸發(fā)了寫入操作時才會進(jìn)入_transform方法中。跟雙工流的區(qū)別就是,他的可讀可寫是在一起的
?
轉(zhuǎn)載于:https://www.cnblogs.com/wanghui-garcia/p/9798158.html
總結(jié)
以上是生活随笔為你收集整理的nodejs-stream部分的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ghost配置1——删除社交Link
- 下一篇: kafka channle的应用案例