node那点事(二) -- Writable streams(可写流)、自定义流
可寫流(Writable Stream)
可寫流是對數據寫入'目的地'的一種抽象。
可寫流的原理其實與可讀流類似,當數據過來的時候會寫入緩存池,當寫入的速度很慢或者寫入暫停時候,數據流便會進入到隊列池緩存起來,當然即使緩存池滿了,剩余的數據也是存在內存
可寫流的簡單用法如下代碼
let fs = require('fs'); let path = require('path'); let ws = fs.createWriteStream(path.join(__dirname,'1.txt'),{highWaterMark:3,autoClose:true,flags:'w',encoding:'utf8',mode:0o666,start:0, }); let i = 9; function write(){let flag = true;while(i>0&&flag){flag = ws.write(--i '','utf8',()=>{console.log('ok')});console.log(flag)} } write(); // drain只有當緩存區(qū)充滿后 并且被消費后觸發(fā) ws.on('drain',function(){console.log('抽干')write(); });實現原理
現在就讓我們來實現一個簡單的可寫流,來研究可寫流的內部原理,可寫流有很多方法與可讀流類似,這里不在重復了首先要有一個構造函數來定義一些基本選項屬性,然后調用一個open放法打開文件,并且有一個destroy方法來處理關閉邏輯
let EventEmitter = require('events'); let fs = require('fs');class WriteStream extends EventEmitter {constructor(path,options) {super();this.path = path;this.highWaterMark = options.highWaterMark || 16 * 1024;this.autoClose = options.autoClose || true;this.mode = options.mode;this.start = options.start || 0;this.flags = options.flags || 'w';this.encoding = options.encoding || 'utf8';// 可寫流 要有一個緩存區(qū),當正在寫入文件是,內容要寫入到緩存區(qū)中// 在源碼中是一個鏈表 => []this.buffers = [];// 標識 是否正在寫入this.writing = false;// 是否滿足觸發(fā)drain事件this.needDrain = false;// 記錄寫入的位置this.pos = 0;// 記錄緩存區(qū)的大小this.length = 0;this.open();}destroy() {if (typeof this.fd !== 'number') {return this.emit('close');}fs.close(this.fd, () => {this.emit('close')});}open() {fs.open(this.path, this.flags, this.mode, (err,fd) => {if (err) {this.emit('error', err);if (this.autoClose) {this.destroy();}return;}this.fd = fd;this.emit('open');})} }module.exports = WriteStream;接著我們實現write方法來讓可寫流對象調用,在write方法中我們首先將數據轉化為buffer,接著實現一些事件的觸發(fā)條件的邏輯,如果現在沒有正在寫入的話我們就要真正的進行寫入操作了,這里我們實現一個_write方法來實現寫入操作,否則則代表文件正在寫入,那我們就將流傳來的數據先放在緩存區(qū)中,保證寫入數據不會同時進行。
write(chunk,encoding=this.encoding,callback=()=>{}){chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);// write 返回一個boolean類型 this.length =chunk.length; let ret = this.length<this.highWaterMark; // 比較是否達到了緩存區(qū)的大小this.needDrain = !ret; // 是否需要觸發(fā)needDrain// 判斷是否正在寫入 如果是正在寫入 就寫入到緩存區(qū)中if(this.writing){this.buffers.push({encoding,chunk,callback}); // []}else{// 專門用來將內容 寫入到文件內this.writing = true;this._write(chunk,encoding,()=>{callback();this.clearBuffer();}); // 8}return ret; }_write(chunk,encoding,callback){if(typeof this.fd !== 'number'){return this.once('open',()=>this._write(chunk,encoding,callback));}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{this.length -= byteWritten;this.pos = byteWritten;callback(); // 清空緩存區(qū)的內容}); }_write寫入之后的回調中我們會調用傳入回調函數clearBuffer,這個方法會去buffers中繼續(xù)遞歸地把數據取出,然后繼續(xù)調用_write方法去寫入,直到全部buffer中的數據取出后,這樣就清空了buffers。
clearBuffer(){let buffer = this.buffers.shift();if(buffer){this._write(buffer.chunk,buffer.encoding,()=>{buffer.callback();this.clearBuffer()});}else{this.writing = false;if(this.needDrain){ // 是否需要觸發(fā)drain 需要就發(fā)射drain事件this.needDrain = false;this.emit('drain');}} }最后附上完整的代碼
let EventEmitter = require('events'); let fs = require('fs'); class WriteStream extends EventEmitter{constructor(path,options){super();this.path = path;this.highWaterMark = options.highWaterMark||16*1024;this.autoClose = options.autoClose||true;this.mode = options.mode;this.start = options.start||0;this.flags = options.flags||'w';this.encoding = options.encoding || 'utf8';// 可寫流 要有一個緩存區(qū),當正在寫入文件是,內容要寫入到緩存區(qū)中// 在源碼中是一個鏈表 => []this.buffers = [];// 標識 是否正在寫入this.writing = false;// 是否滿足觸發(fā)drain事件this.needDrain = false;// 記錄寫入的位置this.pos = 0;// 記錄緩存區(qū)的大小this.length = 0;this.open();}destroy(){if(typeof this.fd !=='number'){return this.emit('close');}fs.close(this.fd,()=>{this.emit('close')})}open(){fs.open(this.path,this.flags,this.mode,(err,fd)=>{if(err){this.emit('error',err);if(this.autoClose){this.destroy();}return}this.fd = fd;this.emit('open');})}write(chunk,encoding=this.encoding,callback=()=>{}){chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);// write 返回一個boolean類型 this.length =chunk.length; let ret = this.length<this.highWaterMark; // 比較是否達到了緩存區(qū)的大小this.needDrain = !ret; // 是否需要觸發(fā)needDrain// 判斷是否正在寫入 如果是正在寫入 就寫入到緩存區(qū)中if(this.writing){this.buffers.push({encoding,chunk,callback}); // []}else{// 專門用來將內容 寫入到文件內this.writing = true;this._write(chunk,encoding,()=>{callback();this.clearBuffer();}); // 8}return ret;}clearBuffer(){let buffer = this.buffers.shift();if(buffer){this._write(buffer.chunk,buffer.encoding,()=>{buffer.callback();this.clearBuffer()});}else{this.writing = false;if(this.needDrain){ // 是否需要觸發(fā)drain 需要就發(fā)射drain事件this.needDrain = false;this.emit('drain');}}}_write(chunk,encoding,callback){if(typeof this.fd !== 'number'){return this.once('open',()=>this._write(chunk,encoding,callback));}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{this.length -= byteWritten;this.pos = byteWritten;callback(); // 清空緩存區(qū)的內容});} }module.exports = WriteStream;Pipe管道流
前面我們了解了可讀流與可寫流,那么怎么讓二者結合起來使用呢,node給我們提供好了方法--Pipe管道,流顧名思義,就是在可讀流與可寫流中間加入一個管道,實現一邊讀取,一邊寫入,讀一點寫一點。
Pipe的使用方法如下
let fs = require('fs'); let path = require('path'); let ReadStream = require('./ReadStream'); let WriteStream = require('./WriteStream');let rs = new ReadStream(path.join(__dirname, './1.txt'), {highWaterMark: 4 }); let ws = new WriteStream(path.join(__dirname, './2.txt'), {highWaterMark: 1 }); // 4 1 rs.pipe(ws);實現原理
Pipe的原理比較簡單,簡單說監(jiān)聽可讀流的data事件來持續(xù)獲取文件中的數據,然后我們就會去調用寫流的write方法。如果可寫流緩存區(qū)已滿,那么當我們得到調用可讀流的pause方法來暫停讀取,然后等到寫流的緩存區(qū)已經全部寫入并且觸發(fā)drain事件時,我們就會調用resume重新開啟讀取的流程。上代碼
pipe(ws) {this.on('data', (chunk) => {let flag = ws.write(chunk);if (!flag) {this.pause();}});ws.on('drain', () => {this.resume();}) }自定義流
Node允許我們自定義流,讀流繼承于Readable接口,寫流則繼承于Writable接口,所以我們其實是可以自定義一個流模塊,只要繼承stream模塊對應的接口即可。
自定義可讀流
如果我們要自定義讀流的話,那我們就需要繼承Readable,Readable里面有一個read()方法,默認調用_read(),所以我們只要復寫了_read()方法就可實現讀取的邏輯,同時Readable中也提供了一個push方法,調用push方法就會觸發(fā)data事件,push中的參數就是data事件回調函數的參數,當push傳入的參數為null的時候就代表讀流停止,上代碼
let { Readable } = require('stream');// 想實現什么流 就繼承這個流 // Readable里面有一個read()方法,默認掉_read() // Readable中提供了一個push方法你調用push方法就會觸發(fā)data事件 let index = 9; class MyRead extends Readable {_read() {// 可讀流什么時候停止呢? 當push null的時候停止if (index-- > 0) return this.push('123');this.push(null);} }let mr = new MyRead(); mr.on('data', function(data) {console.log(data); });自定義可寫流
與自定義讀流類似,自定義寫流需要繼承Writable接口,并且實現一個_write()方法,這里注意的是_write中可以傳入3個參數,chunk, encoding, callback,chunk就是代表寫入的數據,通常是一個buffer,encoding是編碼類型,通常不會用到,最后的callback要注意,它并不是我們用這個自定義寫流調用write時的回調,而是我們上面講到寫流實現時的clearBuffer函數。
let { Writable } = require('stream');// 可寫流實現_write方法 // 源碼中默認調用的是Writable中的write方法 class MyWrite extends Writable {_write(chunk, encoding, callback) {console.log(chunk.toString());callback(); // clearBuffer} }let mw = new MyWrite(); mw.write('111', 'utf8', () => {console.log(1); }) mw.write('222', 'utf8', () => {console.log(1); });Duplex 雙工流
雙工流其實就是結合了上面我們說的自定義讀流和自定義寫流,它既能讀也能寫,同時可以做到讀寫之間互不干擾
let { Duplex } = require('stream');// 雙工流 又能讀 又能寫,而且讀取可以沒關系(互不干擾) let d = Duplex({read() {this.push('hello');this.push(null);},write(chunk, encoding, callback) {console.log(chunk);callback();} });d.on('data', function(data) {console.log(data); }); d.write('hello');Transform 轉換流
轉換流的本質就是雙工流,唯一不同的是它并不需要像上面提到的雙工流一樣實現read和write,它只需要實現一個transform方法用于轉換
let { Transform } = require('stream');// 它的參數和可寫流一樣 let tranform1 = Transform({transform(chunk, encoding, callback) {this.push(chunk.toString().toUpperCase()); // 將輸入的內容放入到可讀流中callback();} }); let tranform2 = Transform({transform(chunk, encoding, callback){console.log(chunk.toString());callback();} });// 等待你的輸入 // rs.pipe(ws); // 希望將輸入的內容轉化成大寫在輸出出來 process.stdin.pipe(tranform1).pipe(tranform2); // 對象流 可讀流里只能放buffer或者字符串 對象流里可以放對象對象流
默認情況下,流處理的數據是Buffer/String類型的值。對象流的特點就是它有一個objectMode標志,我們可以設置它讓流可以接受任何JavaScript對象。上代碼
const { Transform } = require('stream');let fs = require('fs'); let rs = fs.createReadStream('./users.json');rs.setEncoding('utf8');let toJson = Transform({readableObjectMode: true,transform(chunk, encoding, callback) {this.push(JSON.parse(chunk));callback();} });let jsonOut = Transform({writableObjectMode: true,transform(chunk, encoding, callback) {console.log(chunk);callback();} }); rs.pipe(toJson).pipe(jsonOut);更多專業(yè)前端知識,請上 【猿2048】www.mk2048.com
總結
以上是生活随笔為你收集整理的node那点事(二) -- Writable streams(可写流)、自定义流的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Vue结合HTML5拖放API 实现目录
- 下一篇: node中的缓存机制