可寫流是對數據寫入'目的地'的一種抽象。node
可寫流的原理其實與可讀流相似,當數據過來的時候會寫入緩存池,當寫入的速度很慢或者寫入暫停時候,數據流便會進入到隊列池緩存起來,固然即便緩存池滿了,剩餘的數據也是存在內存json
可寫流的簡單用法以下代碼緩存
let fs = require('fs'); let path = require('path'); let ws = fs.createWriteStream(path.join(__dirname,'1.txt'),{ highWaterMark:3, autoClose:true, flags:'w', encoding:'utf8', mode:0o666, start:0, }); let i = 9; function write(){ let flag = true; while(i>0&&flag){ flag = ws.write(--i+'','utf8',()=>{console.log('ok')}); console.log(flag) } } write(); // drain只有當緩存區充滿後 而且被消費後觸發 ws.on('drain',function(){ console.log('抽乾') write(); });
如今就讓咱們來實現一個簡單的可寫流,來研究可寫流的內部原理,可寫流有不少方法與可讀流相似,這裏不在重複了首先要有一個構造函數來定義一些基本選項屬性,而後調用一個open放法打開文件,而且有一個destroy方法來處理關閉邏輯函數
let EventEmitter = require('events'); let fs = require('fs'); class WriteStream extends EventEmitter { constructor(path,options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 16 * 1024; this.autoClose = options.autoClose || true; this.mode = options.mode; this.start = options.start || 0; this.flags = options.flags || 'w'; this.encoding = options.encoding || 'utf8'; // 可寫流 要有一個緩存區,當正在寫入文件是,內容要寫入到緩存區中 // 在源碼中是一個鏈表 => [] this.buffers = []; // 標識 是否正在寫入 this.writing = false; // 是否知足觸發drain事件 this.needDrain = false; // 記錄寫入的位置 this.pos = 0; // 記錄緩存區的大小 this.length = 0; this.open(); } destroy() { if (typeof this.fd !== 'number') { return this.emit('close'); } fs.close(this.fd, () => { this.emit('close') }); } open() { fs.open(this.path, this.flags, this.mode, (err,fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); } return; } this.fd = fd; this.emit('open'); }) } } module.exports = WriteStream;
接着咱們實現write方法來讓可寫流對象調用,在write方法中咱們首先將數據轉化爲buffer,接着實現一些事件的觸發條件的邏輯,若是如今沒有正在寫入的話咱們就要真正的進行寫入操做了,這裏咱們實現一個_write方法來實現寫入操做,不然則表明文件正在寫入,那咱們就將流傳來的數據先放在緩存區中,保證寫入數據不會同時進行。ui
write(chunk,encoding=this.encoding,callback=()=>{}){ chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding); // write 返回一個boolean類型 this.length+=chunk.length; let ret = this.length<this.highWaterMark; // 比較是否達到了緩存區的大小 this.needDrain = !ret; // 是否須要觸發needDrain // 判斷是否正在寫入 若是是正在寫入 就寫入到緩存區中 if(this.writing){ this.buffers.push({ encoding, chunk, callback }); // [] }else{ // 專門用來將內容 寫入到文件內 this.writing = true; this._write(chunk,encoding,()=>{ callback(); this.clearBuffer(); }); // 8 } return ret; } _write(chunk,encoding,callback){ if(typeof this.fd !== 'number'){ return this.once('open',()=>this._write(chunk,encoding,callback)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{ this.length -= byteWritten; this.pos += byteWritten; callback(); // 清空緩存區的內容 }); }
_write寫入以後的回調中咱們會調用傳入回調函數clearBuffer,這個方法會去buffers中繼續遞歸地把數據取出,而後繼續調用_write方法去寫入,直到所有buffer中的數據取出後,這樣就清空了buffers。this
clearBuffer(){ let buffer = this.buffers.shift(); if(buffer){ this._write(buffer.chunk,buffer.encoding,()=>{ buffer.callback(); this.clearBuffer() }); }else{ this.writing = false; if(this.needDrain){ // 是否須要觸發drain 須要就發射drain事件 this.needDrain = false; this.emit('drain'); } } }
最後附上完整的代碼編碼
let EventEmitter = require('events'); let fs = require('fs'); class WriteStream extends EventEmitter{ constructor(path,options){ super(); this.path = path; this.highWaterMark = options.highWaterMark||16*1024; this.autoClose = options.autoClose||true; this.mode = options.mode; this.start = options.start||0; this.flags = options.flags||'w'; this.encoding = options.encoding || 'utf8'; // 可寫流 要有一個緩存區,當正在寫入文件是,內容要寫入到緩存區中 // 在源碼中是一個鏈表 => [] this.buffers = []; // 標識 是否正在寫入 this.writing = false; // 是否知足觸發drain事件 this.needDrain = false; // 記錄寫入的位置 this.pos = 0; // 記錄緩存區的大小 this.length = 0; this.open(); } destroy(){ if(typeof this.fd !=='number'){ return this.emit('close'); } fs.close(this.fd,()=>{ this.emit('close') }) } open(){ fs.open(this.path,this.flags,this.mode,(err,fd)=>{ if(err){ this.emit('error',err); if(this.autoClose){ this.destroy(); } return } this.fd = fd; this.emit('open'); }) } write(chunk,encoding=this.encoding,callback=()=>{}){ chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding); // write 返回一個boolean類型 this.length+=chunk.length; let ret = this.length<this.highWaterMark; // 比較是否達到了緩存區的大小 this.needDrain = !ret; // 是否須要觸發needDrain // 判斷是否正在寫入 若是是正在寫入 就寫入到緩存區中 if(this.writing){ this.buffers.push({ encoding, chunk, callback }); // [] }else{ // 專門用來將內容 寫入到文件內 this.writing = true; this._write(chunk,encoding,()=>{ callback(); this.clearBuffer(); }); // 8 } return ret; } clearBuffer(){ let buffer = this.buffers.shift(); if(buffer){ this._write(buffer.chunk,buffer.encoding,()=>{ buffer.callback(); this.clearBuffer() }); }else{ this.writing = false; if(this.needDrain){ // 是否須要觸發drain 須要就發射drain事件 this.needDrain = false; this.emit('drain'); } } } _write(chunk,encoding,callback){ if(typeof this.fd !== 'number'){ return this.once('open',()=>this._write(chunk,encoding,callback)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{ this.length -= byteWritten; this.pos += byteWritten; callback(); // 清空緩存區的內容 }); } } module.exports = WriteStream;
前面咱們瞭解了可讀流與可寫流,那麼怎麼讓兩者結合起來使用呢,node給咱們提供好了方法--Pipe管道,流顧名思義,就是在可讀流與可寫流中間加入一個管道,實現一邊讀取,一邊寫入,讀一點寫一點。code
Pipe的使用方法以下orm
let fs = require('fs'); let path = require('path'); let ReadStream = require('./ReadStream'); let WriteStream = require('./WriteStream'); let rs = new ReadStream(path.join(__dirname, './1.txt'), { highWaterMark: 4 }); let ws = new WriteStream(path.join(__dirname, './2.txt'), { highWaterMark: 1 }); // 4 1 rs.pipe(ws);
Pipe的原理比較簡單,簡單說監聽可讀流的data事件來持續獲取文件中的數據,而後咱們就會去調用寫流的write方法。若是可寫流緩存區已滿,那麼當咱們獲得調用可讀流的pause方法來暫停讀取,而後等到寫流的緩存區已經所有寫入而且觸發drain事件時,咱們就會調用resume從新開啓讀取的流程。上代碼對象
pipe(ws) { this.on('data', (chunk) => { let flag = ws.write(chunk); if (!flag) { this.pause(); } }); ws.on('drain', () => { this.resume(); }) }
Node容許咱們自定義流,讀流繼承於Readable接口,寫流則繼承於Writable接口,因此咱們實際上是能夠自定義一個流模塊,只要繼承stream模塊對應的接口便可。
若是咱們要自定義讀流的話,那咱們就須要繼承Readable,Readable裏面有一個read()方法,默認調用_read(),因此咱們只要複寫了_read()方法就可實現讀取的邏輯,同時Readable中也提供了一個push方法,調用push方法就會觸發data事件,push中的參數就是data事件回調函數的參數,當push傳入的參數爲null的時候就表明讀流中止,上代碼
let { Readable } = require('stream'); // 想實現什麼流 就繼承這個流 // Readable裏面有一個read()方法,默認掉_read() // Readable中提供了一個push方法你調用push方法就會觸發data事件 let index = 9; class MyRead extends Readable { _read() { // 可讀流何時中止呢? 當push null的時候中止 if (index-- > 0) return this.push('123'); this.push(null); } } let mr = new MyRead(); mr.on('data', function(data) { console.log(data); });
與自定義讀流相似,自定義寫流須要繼承Writable接口,而且實現一個_write()方法,這裏注意的是_write中能夠傳入3個參數,chunk, encoding, callback,chunk就是表明寫入的數據,一般是一個buffer,encoding是編碼類型,一般不會用到,最後的callback要注意,它並非咱們用這個自定義寫流調用write時的回調,而是咱們上面講到寫流實現時的clearBuffer函數。
let { Writable } = require('stream'); // 可寫流實現_write方法 // 源碼中默認調用的是Writable中的write方法 class MyWrite extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); // clearBuffer } } let mw = new MyWrite(); mw.write('111', 'utf8', () => { console.log(1); }) mw.write('222', 'utf8', () => { console.log(1); });
雙工流其實就是結合了上面咱們說的自定義讀流和自定義寫流,它既能讀也能寫,同時能夠作到讀寫之間互不干擾
let { Duplex } = require('stream'); // 雙工流 又能讀 又能寫,並且讀取能夠不要緊(互不干擾) let d = Duplex({ read() { this.push('hello'); this.push(null); }, write(chunk, encoding, callback) { console.log(chunk); callback(); } }); d.on('data', function(data) { console.log(data); }); d.write('hello');
轉換流的本質就是雙工流,惟一不一樣的是它並不須要像上面提到的雙工流同樣實現read和write,它只須要實現一個transform方法用於轉換
let { Transform } = require('stream'); // 它的參數和可寫流同樣 let tranform1 = Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); // 將輸入的內容放入到可讀流中 callback(); } }); let tranform2 = Transform({ transform(chunk, encoding, callback){ console.log(chunk.toString()); callback(); } }); // 等待你的輸入 // rs.pipe(ws); // 但願將輸入的內容轉化成大寫在輸出出來 process.stdin.pipe(tranform1).pipe(tranform2); // 對象流 可讀流裏只能放buffer或者字符串 對象流裏能夠放對象
默認狀況下,流處理的數據是Buffer/String類型的值。對象流的特色就是它有一個objectMode標誌,咱們能夠設置它讓流能夠接受任何JavaScript對象。上代碼
const { Transform } = require('stream'); let fs = require('fs'); let rs = fs.createReadStream('./users.json'); rs.setEncoding('utf8'); let toJson = Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.parse(chunk)); callback(); } }); let jsonOut = Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { console.log(chunk); callback(); } }); rs.pipe(toJson).pipe(jsonOut);