可讀流(Readable stream)接口是對你正在讀取的數據的來源的抽象。換句話說,數據來來自可讀流(Readable stream)不會分發數據,直到你代表準備就緒。
可讀流(Readable stream) 有2種模式: 流動模式(flowing mode) 和 暫停模式(paused mode). 流動模式(flowing mode)時,儘快的從底層系統讀取數據並提供給你的程序。 暫停模式(paused mode)時, 你必須明確的調用 stream.read() 來讀取數據。 暫停模式(paused mode) 是默認模式。
能夠經過下面幾個方法,將流切換到流動模式(flowing mode)。json
let fs = require('fs'); /** * 全部初始工做模式爲 paused 的 Readable 流,能夠經過下面三種途徑切換到 flowing 模式: 監聽 'data' 事件 調用 stream.resume() 方法 調用 stream.pipe() 方法將數據發送到 Writable */ let rs = fs.createReadStream('./1.txt',{ highWaterMark:3 }); /* 269 stream.emit('data', chunk); stream.read(0); rs.on('data',function (data) { console.log(data); }); rs.on('end',function () { console.log('end'); });*/ //當你監聽 readable事件的時候,會進入暫停模式 //當監聽readable事件的時候,可讀流會立刻去向底層讀取文件,而後把讀到文件的文件放在緩存區裏const state = this._readableState; //self.read(0); 只填充緩存,可是並不會發射data事件,可是會發射stream.emit('readable');事件 //this._read(state.highWaterMark); 每次調用底層的方法讀取的時候是讀取3個字節 rs.on('readable',function(){ //length就是指得緩存區數據的大小 // state.length += chunk.length;==3 console.log(rs._readableState.length); //read若是不加參數表示讀取整個緩存區數據 //讀取一個字段,若是可讀流發現你要讀的字節小於等於緩存字節大小,則直接返回 let ch = rs.read(1); console.log(ch); console.log(rs._readableState.length); /* ch = rs.read(1); console.log(ch); console.log(rs._readableState.length);*/ //當你讀完指定的字節後,若是可讀流發現剩下的字節已經比最高水位線小了。則會立馬再次讀取填滿 最高水位線 setTimeout(function(){ console.log(rs._readableState.length); },200) });
這個方法向底層系統寫入數據,並在數據處理完畢後調用所給的回調。返回值表示你是否應該繼續當即寫入。若是數據要緩存在內部,將會返回false。不然返回 true。返回值僅供參考。即便返回 false,你也可能繼續寫。可是寫會緩存在內存裏,因此不要作的太過度。最好的辦法是等待drain 事件後,再寫入數據。緩存
let fs = require('fs'); let ws = fs.createWriteStream('2.txt',{ flags:'w', mode:0o666, start:0, highWaterMark:3 }); let count = 9; function write(){ let flag = true;//緩存區未滿 //寫入方法是同步的,可是寫入文件的過程 異步的。在真正寫入文件後還會執行咱們的回調函數 while(flag && count>0){ console.log('before',count); flag = ws.write((count)+'','utf8',(function (i) { return ()=>console.log('after',i); })(count)); count--; } } write();//987 //監聽緩存區清空事件 ws.on('drain',function () { console.log('drain'); write();//654 321 }); ws.on('error',function (err) { console.log(err); }); //若是已經再也不須要寫入了,能夠調用end方法關閉寫入流,一旦調用end方法以後則不能再寫入 ws.end(); //write after end // ws.write('x');
雙工流(Duplex streams)是同時實現了 Readable and Writable 接口。用法詳見下文異步
let {Duplex} = require('stream'); let index = 0; let s = Duplex({ read(){ if(index++<3) this.push('a'); else this.push(null); }, write(chunk,encoding,cb){ console.log(chunk.toString().toUpperCase()); cb(); } }); //process.stdin 標準輸入流 //proces.stdout標準輸出流 process.stdin.pipe(s).pipe(process.stdout);
它的輸出是從輸入計算得來。 它實現了Readable 和 Writable 接口. 用法詳見下文.函數
let {Transform} = require('stream'); //轉換流是實現數據轉換的 let t = Transform({ transform(chunk,encoding,cb){ this.push(chunk.toString().toUpperCase()); cb(); } }); process.stdin.pipe(t).pipe(process.stdout);
let {Transform} = require('stream'); let fs = require('fs'); let rs = fs.createReadStream('./user.json'); //普通流裏的放的是Buffer,對象流裏放的對象 let toJSON = Transform({ readableObjectMode:true,//就能夠向可讀流裏放對象 transform(chunk,encoding,cb){ //向可讀流裏的緩存區裏放 this.push(JSON.parse(chunk.toString())); } }); let outJSON = Transform({ writableObjectMode:true,//就能夠向可讀流裏放對象 transform(chunk,encoding,cb){ console.log(chunk); cb(); } }); rs.pipe(toJSON).pipe(outJSON);