流是一種數據傳輸手段,是有順序的,有起點和終點,好比你要把數據從一個地方傳到另一個地方
流很是重要,gulp,webpack,HTTP裏的請求和響應,http裏的socket都是流,包括後面壓縮,加密等流爲何這麼好用還這麼重要呢?javascript
流是一個抽象接口,被Node中不少對象所實現,好比HTTP服務器request和response對象都是流Node.js 中有四種基本的流類型:java
能夠經過 require('stream') 加載 Stream 基類。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基類
可讀流(Readable streams)是對提供數據的 源頭(source)的抽象
可讀流的例子包括:
全部的 Readable 都實現了 stream.Readable 類定義的接口
經過流讀取數據
linux
下面簡單舉個可讀流的例子:
let fs = require('fs'); //經過建立一個可讀流 let rs = fs.createReadStream('./1.txt',{ flags:'r',//咱們要對文件進行何種操做 mode:0o666,//權限位 encoding:'utf8',//不傳默認爲buffer,顯示爲字符串 start:3,//從索引爲3的位置開始讀 //這是個人見過惟一一個包括結束索引的 end:8,//讀到索引爲8結束 highWaterMark:3//緩衝區大小 }); rs.on('open',function () { console.log('文件打開'); }); rs.setEncoding('utf8');//顯示爲字符串 //但願流有一個暫停和恢復觸發的機制 rs.on('data',function (data) { console.log(data); rs.pause();//暫停讀取和發射data事件 setTimeout(function(){ rs.resume();//恢復讀取並觸發data事件 },2000); }); //若是讀取文件出錯了,會觸發error事件 rs.on('error',function () { console.log("error"); }); //若是文件的內容讀完了,會觸發end事件 rs.on('end',function () { console.log('讀完了'); }); rs.on('close',function () { console.log('文件關閉'); }); /** 文件打開 334 455 讀完了 文件關閉 **/
let fs = require('fs'); let ReadStream = require('./ReadStream'); let rs = ReadStream('./1.txt', { flags: 'r', encoding: 'utf8', start: 3, end: 7, highWaterMark: 3 }); rs.on('open', function () { console.log("open"); }); rs.on('data', function (data) { console.log(data); }); rs.on('end', function () { console.log("end"); }); rs.on('close', function () { console.log("close"); }); /** open 456 789 end close **/
let fs = require('fs'); let EventEmitter = require('events'); class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.buffer = Buffer.alloc(this.highWaterMark); this.flags = options.flags || 'r'; this.encoding = options.encoding; this.mode = options.mode || 0o666; this.start = options.start || 0; this.end = options.end; this.pos = this.start; this.autoClose = options.autoClose || true; this.bytesRead = 0; this.closed = false; this.flowing; this.needReadable = false; this.length = 0; this.buffers = []; this.on('end', function () { if (this.autoClose) { this.destroy(); } }); this.on('newListener', (type) => { if (type == 'data') { this.flowing = true; this.read(); } if (type == 'readable') { this.read(0); } }); this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { if (this.autoClose) { this.destroy(); return this.emit('error', err); } } this.fd = fd; this.emit('open'); }); } read(n) { if (typeof this.fd != 'number') { return this.once('open', () => this.read()); } n = parseInt(n, 10); if (n != n) { n = this.length; } if (this.length == 0) this.needReadable = true; let ret; if (0 < n < this.length) { ret = Buffer.alloc(n); let b; let index = 0; while (null != (b = this.buffers.shift())) { for (let i = 0; i < b.length; i++) { ret[index++] = b[i]; if (index == ret.length) { this.length -= n; b = b.slice(i + 1); this.buffers.unshift(b); break; } } } if (this.encoding) ret = ret.toString(this.encoding); } let _read = () => { let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => { if (err) { return } let data; if (bytesRead > 0) { data = this.buffer.slice(0, bytesRead); this.pos += bytesRead; this.length += bytesRead; if (this.end && this.pos > this.end) { if (this.needReadable) { this.emit('readable'); } this.emit('end'); } else { this.buffers.push(data); if (this.needReadable) { this.emit('readable'); this.needReadable = false; } } } else { if (this.needReadable) { this.emit('readable'); } return this.emit('end'); } }) } if (this.length == 0 || (this.length < this.highWaterMark)) { _read(0); } return ret; } destroy() { fs.close(this.fd, (err) => { this.emit('close'); }); } pause() { this.flowing = false; } resume() { this.flowing = true; this.read(); } pipe(dest) { this.on('data', (data) => { let flag = dest.write(data); if (!flag) this.pause(); }); dest.on('drain', () => { this.resume(); }); this.on('end', () => { dest.end(); }); } } module.exports = ReadStream;
爲了實現可讀流,引用Readable接口並用它構造新對象
var stream = require('stream'); var util = require('util'); util.inherits(Counter, stream.Readable); function Counter(options) { stream.Readable.call(this, options); this._index = 0; } Counter.prototype._read = function() { if(this._index++<3){ this.push(this._index+''); }else{ this.push(null); } }; var counter = new Counter(); counter.on('data', function(data){ console.log("讀到數據: " + data.toString());//no maybe }); counter.on('end', function(data){ console.log("讀完了"); });
Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式決定了chunk數據流動的方式---自動流動仍是手工流動。那如何觸發這兩種模式呢:
若是 Readable 切換到 flowing 模式,且沒有消費者處理流中的數據,這些數據將會丟失。 好比, 調用了 readable.resume() 方法卻沒有監聽 'data' 事件,或是取消了 'data' 事件監聽,就有可能出現這種狀況
可讀流的三種狀態
webpack
在任意時刻,任意可讀流應確切處於下面三種狀態之一:
兩種模式取決於可讀流flowing狀態:
flowing mode
web
經過註冊data、pipe、resume能夠自動獲取所須要的數據,咱們來看下源碼的實現
// data事件觸發flowing mode if (ev === 'data') { // Start flowing on next tick if stream isn't explicitly paused if (this._readableState.flowing !== false) this.resume(); } else if (ev === 'readable') { const state = this._readableState; if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; state.emittedReadable = false; if (!state.reading) { process.nextTick(nReadingNextTick, this); } else if (state.length) { emitReadable(this); } } } // resume觸發flowing mode Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug('resume'); state.flowing = true; resume(this, state); } return this; } // pipe方法觸發flowing模式 Readable.prototype.resume = function() { if (!state.flowing) { this.resume() } }
flowing mode的三種方法最後均是經過resume方法,將狀態變爲true:state.flowing = true
paused mode
json
在paused mode下,須要手動地讀取數據,而且能夠直接指定讀取數據的長度
能夠經過監聽事件readable,觸發時手工讀取chunk數據:
let fs = require('fs'); let rs = fs.createReadStream('./1.txt',{ highWaterMark:3 }); rs.on('readable',function(){ console.log(rs._readableState.length); //read若是不加參數表示讀取整個緩存區數據 //讀取一個字段,若是可讀流發現你要讀的字節小於等於緩存字節大小,則直接返回 let chunk = rs.read(1); console.log(chunk); console.log(rs._readableState.length); //當你讀完指定的字節後,若是可讀流發現剩下的字節已經比最高水位線小了。則會立馬再次讀取填滿 最高水位線 setTimeout(function(){ console.log(rs._readableState.length); },200) });
注意:一旦註冊了readable事件,必須手工讀取read數據,不然數據就會流失,咱們來看下源碼的實現
function emitReadable(stream) { var state = stream._readableState; state.needReadable = false; if (!state.emittedReadable) { debug('emitReadable', state.flowing); state.emittedReadable = true; process.nextTick(emitReadable_, stream); } } function emitReadable_(stream) { var state = stream._readableState; debug('emit readable'); if (!state.destroyed && (state.length || state.ended)) { stream.emit('readable'); } state.needReadable = !state.flowing && !state.ended; flow(stream); } function flow(stream) { const state = stream._readableState; debug('flow', state.flowing); while (state.flowing && stream.read() !== null); } function endReadable(stream) { var state = stream._readableState; debug('endReadable', state.endEmitted); if (!state.endEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); } } Readable.prototype.read = function(n) { debug('read', n); n = parseInt(n, 10); var state = this._readableState; var nOrig = n; if (n !== 0) state.emittedReadable = false; if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { debug('read: emitReadable', state.length, state.ended); if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; } n = howMuchToRead(n, state); if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; }
flow方法直接read數據,將獲得的數據經過事件data交付出去,然而此處沒有註冊data事件監控,所以,獲得的chunk數據並無交付給任何對象,這樣數據就白白流失了,因此在觸發emit('readable')時,須要提早read數據
可寫流是對數據寫入'目的地'的一種抽象
Writable:可寫流的例子包括了:
下面舉個可寫流的簡單例子
let fs = require('fs'); let ws = fs.createWriteStream('./2.txt',{ flags:'w', mode:0o666, start:3, highWaterMark:3//默認是16K });
let flag = ws.write('1'); console.log(flag);//true flag =ws.write('2'); console.log(flag);//true flag =ws.write('3'); console.log(flag);//false flag =ws.write('4'); console.log(flag);//false
'drain' 事件
gulp
若是調用 stream.write(chunk) 方法返回 false,流將在適當的時機觸發 'drain' 事件,這時才能夠繼續向流中寫入數據當一個流不處在 drain 的狀態, 對 write() 的調用會緩存數據塊, 而且返回 false。 一旦全部當前全部緩存的數據塊都排空了(被操做系統接受來進行輸出), 那麼 'drain' 事件就會被觸發緩存
建議, 一旦 write() 返回 false, 在 'drain' 事件觸發前, 不能寫入任何數據塊服務器
舉個簡單的例子說明一下:異步
let fs = require('fs'); let ws = fs.createWriteStream('2.txt',{ flags:'w', mode:0o666, start:0, highWaterMark:3 }); let count = 9; function write(){ let flag = true;//緩存區未滿 //寫入方法是同步的,可是寫入文件的過程是異步的。在真正寫入文件後還會執行咱們的回調函數 while(flag && count>0){ console.log('before',count); flag = ws.write((count)+'','utf8',(function (i) { return ()=>console.log('after',i); })(count)); count--; } } write();//987 //監聽緩存區清空事件 ws.on('drain',function () { console.log('drain'); write();//654 321 }); ws.on('error',function (err) { console.log(err); }); /** before 9 before 8 before 7 after 9 after 8 after 7 **/
若是已經再也不須要寫入了,能夠調用end方法關閉寫入流,一旦調用end方法以後則不能再寫入
好比在ws.end();
後寫ws.write('x');
,會報錯write after end
'pipe'事件
linux精典的管道的概念,前者的輸出是後者的輸入pipe是一種最簡單直接的方法鏈接兩個stream,內部實現了數據傳遞的整個過程,在開發的時候不須要關注內部數據的流動
pipe方法的原理
var fs = require('fs'); var ws = fs.createWriteStream('./2.txt'); var rs = fs.createReadStream('./1.txt'); rs.on('data', function (data) { var flag = ws.write(data); if(!flag) rs.pause(); }); ws.on('drain', function () { rs.resume(); }); rs.on('end', function () { ws.end(); });
下面舉個簡單的例子說明一下pipe的用法:
let fs = require('fs'); let rs = fs.createReadStream('./1.txt',{ highWaterMark:3 }); let ws = fs.createWriteStream('./2.txt',{ highWaterMark:3 }); rs.pipe(ws); //移除目標可寫流 rs.unpipe(ws);
rs.on('data',function (data) { console.log(data); let flag = ws.write(data); if(!flag){ rs.pause(); } });
ws.on('drain',function () { console.log('drain'); rs.resume(); });
unpipe
readable.unpipe()方法將以前經過stream.pipe()方法綁定的流分離
簡單距離說明下unpipe的用法:
let fs = require('fs'); var from = fs.createReadStream('./1.txt'); var to = fs.createWriteStream('./2.txt'); from.pipe(to); setTimeout(() => { console.log('關閉向2.txt的寫入'); from.unpipe(writable); console.log('手工關閉文件流'); to.end(); }, 1000);
let fs = require('fs'); let ReadStream = require('./ReadStream'); let rs = ReadStream('./1.txt', { flags: 'r', encoding: 'utf8', highWaterMark: 3 }); let FileWriteStream = require('./WriteStream'); let ws = FileWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) { this.on('data', (data)=>{ let flag = dest.write(data); if(!flag){ this.pause(); } }); dest.on('drain', ()=>{ this.resume(); }); this.on('end', ()=>{ dest.end(); }); } ReadStream.prototype.pause = function(){ this.flowing = false; } ReadStream.prototype.resume = function(){ this.flowing = true; this.read(); }
const stream = require('stream') var index = 0; const readable = stream.Readable({ highWaterMark: 2, read: function () { process.nextTick(() => { console.log('push', ++index) this.push(index+''); }) } }) const writable = stream.Writable({ highWaterMark: 2, write: function (chunk, encoding, next) { console.log('寫入:', chunk.toString()) } }) readable.pipe(writable);
let fs = require('fs'); let FileWriteStream = require('./FileWriteStream'); let ws = FileWriteStream('./2.txt',{ flags:'w', encoding:'utf8', highWaterMark:3 }); let i = 10; function write(){ let flag = true; while(i&&flag){ flag = ws.write("1",'utf8',(function(i){ return function(){ console.log(i); } })(i)); i--; console.log(flag); } } write(); ws.on('drain',()=>{ console.log("drain"); write(); }); /** 10 9 8 drain 7 6 5 drain 4 3 2 drain 1 **/
let EventEmitter = require('events'); let util = require('util'); let fs = require('fs'); util.inherits(WriteStream, EventEmitter); function WriteStream(path, options) { EventEmitter.call(this); if (!(this instanceof WriteStream)) { return new WriteStream(path, options); } this.path = path; this.fd = options.fd; this.encoding = options.encoding||'utf8'; this.flags = options.flags || 'w'; this.mode = options.mode || 0o666; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.pos = this.start;//開始寫入的索引位置 this.open();//打開文件進行操做 this.writing = false;//沒有在寫入過程 中 this.buffers = []; this.highWaterMark = options.highWaterMark||16*1024; //若是監聽到end事件,並且要求自動關閉的話則關閉文件 this.on('end', function () { if (this.autoClose) { this.destroy() } }); } WriteStream.prototype.close = function(){ fs.close(this.fd,(err)=>{ if(err) this.emit('error',err); }); } WriteStream.prototype.open = function () { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) return this.emit('error', err); this.fd = fd;//把文件描述符賦給當前實例的fd屬性 //發射open事件 this.emit('open', fd); }); } /** * 會判斷當前是後臺是否在寫入過程當中,若是在寫入過程當中,則把這個數據放在待處理的緩存中,若是不在寫入過程當中,能夠直接寫。 */ WriteStream.prototype.write = function (chunk, encoding, cb) { chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding); //先把數據放在緩存裏 this.buffers.push({ chunk, encoding, cb }); let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark; //只有當緩存區寫滿了,那麼清空緩存區的時候纔會發射drain事件,不然 不發放 this.needDrain = isFull; //若是說文件尚未打開,則把寫入的方法壓入open事件的監聽函數。等文件一旦打開,馬上執行寫入操做 if (typeof this.fd !== 'number') { this.once('open', () => { this._write(); }); return !isFull; }else{ if(!this.writing){ setImmediate(()=>{ this._write(); this.writing = true; }); } return !isFull; } } WriteStream.prototype._write = function () { let part = this.buffers.shift(); if (part) { fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{ if(err)return this.emit('error',err); part.cb && part.cb(); this._write(); }); }else{ //發射一個緩存區清空的事件 this.emit('drain'); this.writing = false; } } module.exports = WriteStream;
爲了實現可寫流,咱們須要使用流模塊中的Writable構造函數。 咱們只需給Writable構造函數傳遞一些選項並建立一個對象。惟一須要的選項是write函數,該函數揭露數據塊要往哪裏寫
var stream = require('stream'); var util = require('util'); util.inherits(Writer, stream.Writable); let stock = []; function Writer(opt) { stream.Writable.call(this, opt); } Writer.prototype._write = function(chunk, encoding, callback) { setTimeout(()=>{ stock.push(chunk.toString('utf8')); console.log("增長: " + chunk); callback(); },500) }; var w = new Writer(); for (var i=1; i<=5; i++){ w.write("項目:" + i, 'utf8'); } w.end("結束寫入",function(){ console.log(stock); });
Duplex 流是同時實現了 Readable 和 Writable 接口的流
雙工流的可讀性和可寫性操做徹底獨立於彼此,這僅僅是將兩個特性組合成一個對象Duplex 流的實例包括了:
下面簡單實現雙工流:
const {Duplex} = require('stream'); const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }, read(size) { this.push((++this.index)+''); if (this.index > 3) { this.push(null); } } }); inoutStream.index = 0; process.stdin.pipe(inoutStream).pipe(process.stdout);
變換流(Transform streams) 是一種 Duplex 流。它的輸出與輸入是經過某種方式關聯的。和全部 Duplex 流同樣,變換流同時實現了 Readable 和 Writable 接口轉換流的輸出是從輸入中計算出來的
對於轉換流,咱們沒必要實現read或write的方法,咱們只須要實現一個transform方法,將二者結合起來。它有write方法的意思,咱們也能夠用它來push數據變換流的實例包括:
下面簡單實現轉換流:
const {Transform} = require('stream'); const upperCase = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }); process.stdin.pipe(upperCase).pipe(process.stdout);
默認狀況下,流處理的數據是Buffer/String類型的值。有一個objectMode標誌,咱們能夠設置它讓流能夠接受任何JavaScript對象
const {Transform} = require('stream'); let fs = require('fs'); let rs = fs.createReadStream('./users.json'); rs.setEncoding('utf8'); let toJson = Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.parse(chunk)); callback(); } }); let jsonOut = Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { console.log(chunk); callback(); } }); rs.pipe(toJson).pipe(jsonOut);