初探Node中的stream

Stream流有如下四種類型:

  • Readable - 可讀操做
  • Writable - 可寫操做
  • Duplex - 可讀可寫操做
  • Transform - 操做被寫入數據,而後讀出結果

可讀流(Readable stream)

可讀流(Readable stream)接口是對你正在讀取的數據的來源的抽象。換句話說,數據來來自可讀流(Readable stream)不會分發數據,直到你代表準備就緒。
可讀流(Readable stream) 有2種模式: 流動模式(flowing mode) 和 暫停模式(paused mode). 流動模式(flowing mode)時,儘快的從底層系統讀取數據並提供給你的程序。 暫停模式(paused mode)時, 你必須明確的調用 stream.read() 來讀取數據。 暫停模式(paused mode) 是默認模式。
能夠經過下面幾個方法,將流切換到流動模式(flowing mode)。json

let fs = require('fs');
/**
 * 全部初始工做模式爲 paused 的 Readable 流,能夠經過下面三種途徑切換到 flowing 模式:
 監聽 'data' 事件
 調用 stream.resume() 方法
 調用 stream.pipe() 方法將數據發送到 Writable
 */
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
/*
269
 stream.emit('data', chunk);
    stream.read(0);
rs.on('data',function (data) {
    console.log(data);
});
rs.on('end',function () {
    console.log('end');
});*/
//當你監聽 readable事件的時候,會進入暫停模式
//當監聽readable事件的時候,可讀流會立刻去向底層讀取文件,而後把讀到文件的文件放在緩存區裏const state = this._readableState;
//self.read(0); 只填充緩存,可是並不會發射data事件,可是會發射stream.emit('readable');事件
//this._read(state.highWaterMark); 每次調用底層的方法讀取的時候是讀取3個字節
rs.on('readable',function(){
    //length就是指得緩存區數據的大小
    // state.length +=  chunk.length;==3
    console.log(rs._readableState.length);
    //read若是不加參數表示讀取整個緩存區數據
    //讀取一個字段,若是可讀流發現你要讀的字節小於等於緩存字節大小,則直接返回
    let ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);
   /* ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);*/
    //當你讀完指定的字節後,若是可讀流發現剩下的字節已經比最高水位線小了。則會立馬再次讀取填滿 最高水位線
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});

可寫流(Writable stream )

這個方法向底層系統寫入數據,並在數據處理完畢後調用所給的回調。返回值表示你是否應該繼續當即寫入。若是數據要緩存在內部,將會返回false。不然返回 true。返回值僅供參考。即便返回 false,你也可能繼續寫。可是寫會緩存在內存裏,因此不要作的太過度。最好的辦法是等待drain 事件後,再寫入數據。緩存

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//緩存區未滿
    //寫入方法是同步的,可是寫入文件的過程 異步的。在真正寫入文件後還會執行咱們的回調函數
 while(flag && count>0){
     console.log('before',count);
     flag = ws.write((count)+'','utf8',(function (i) {
         return ()=>console.log('after',i);
     })(count));
     count--;
 }
}
write();//987
//監聽緩存區清空事件
ws.on('drain',function () {
    console.log('drain');
    write();//654 321
});
ws.on('error',function (err) {
    console.log(err);
});
//若是已經再也不須要寫入了,能夠調用end方法關閉寫入流,一旦調用end方法以後則不能再寫入
ws.end();
//write after end
//
ws.write('x');

雙工流(Duplex streams)

雙工流(Duplex streams)是同時實現了 Readable and Writable 接口。用法詳見下文異步

let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
    read(){
        if(index++<3)
          this.push('a'); 
          else 
       this.push(null);   
    },
    write(chunk,encoding,cb){
       console.log(chunk.toString().toUpperCase());
       cb();
    }
});
//process.stdin 標準輸入流
//proces.stdout標準輸出流
process.stdin.pipe(s).pipe(process.stdout);

轉換流(Transform streams)

它的輸出是從輸入計算得來。 它實現了Readable 和 Writable 接口. 用法詳見下文.函數

let {Transform}  = require('stream');
//轉換流是實現數據轉換的
let t = Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
let {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./user.json');
//普通流裏的放的是Buffer,對象流裏放的對象
let toJSON = Transform({
    readableObjectMode:true,//就能夠向可讀流裏放對象
    transform(chunk,encoding,cb){
        //向可讀流裏的緩存區裏放
      this.push(JSON.parse(chunk.toString()));
    }
});
let outJSON = Transform({
    writableObjectMode:true,//就能夠向可讀流裏放對象
    transform(chunk,encoding,cb){
      console.log(chunk);
      cb();
    }
});
rs.pipe(toJSON).pipe(outJSON);
相關文章
相關標籤/搜索