渴望力量嗎?少年!流的原理

流(stream),看一我的流不流逼,就看你對流的理解了

學習本無底,前進莫徬徨node

今天跟你們分享的是node.js中的流(stream)。它的做用你們應該都在日常使用node的時候看到過,好比:express

  • gulp中的pipe就是流的一種方法,經過可寫流和可讀流的配合,達到不佔用多餘緩存的一種讀寫方式。
  • express和koa中的res和req也是流,res是可寫流,req是可讀流,他們都是經過封裝node中的net模塊的socket(雙工流,便可寫、可讀流)而來的。
  • 。。。

可能不少時候你們都知道怎麼用,但不瞭解它的原理,很尷尬,就像這樣gulp

何謂流?

  • 流是一組有序的,有起點和終點的字節數據傳輸手段。
  • 它不關心文件的總體內容,只關注是否從文件中讀到了數據,以及讀到數據以後的處理。
  • 流是一個抽象接口,被 Node 中的不少對象所實現。好比HTTP 服務器request和response對象都是流。
  • 流被分爲Readable(可讀流)、Writable(可寫流)、Duplex(雙工流)、Transform(轉換流)

流中的是什麼?

  • 二進制模式:每一個分塊都是buffer、string對象。
  • 對象模式:流內部處理的是一系列普通對象。

可讀流

可讀流分爲flowingpaused兩種模式windows

參數

  • path:讀取的文件的路徑
  • option:
    • highWaterMark:水位線,一次可讀的字節,通常默認是64k
    • flags:標識,打開文件要作的操做,默認是r
    • encoding:編碼,默認爲buffer
    • start:開始讀取的索引位置
    • end:結束讀取的索引位置(包括結束位置)
    • autoClose:讀取完畢是否關閉,默認爲true
let ReadStream = require('./ReadStream')
//讀取的時候默認讀64k 
let rs = new ReadStream('./a.txt',{
  highWaterMark: 2,//一次讀的字節 默認64k
  flags: 'r',      //標示 r爲讀 w爲寫
  autoClose: true, //默認讀取完畢後自動關閉
  start: 0,
  end: 5,          //流是閉合區間包start,也包end 默認是讀完
  encoding: 'utf8' //默認編碼是buffer
})
複製代碼

方法

data:切換到流動模式,能夠流出數據

rs.on('data', function (data) {
    console.log(data);
});
複製代碼

open:流打開文件的時候會觸發此監聽

rs.on('open', function () {
    console.log('文件被打開');
});
複製代碼

error:流出錯的時候,監聽錯誤信息

rs.on('error', function (err) {
    console.log(err);
});
複製代碼

end:流讀取完成,觸發end

rs.on('end', function (err) {
    console.log('讀取完成');
});
複製代碼

close:關閉流,觸發

rs.on('close', function (err) {
    console.log('關閉');
});
複製代碼

pause:暫停流(改變流的flowing,不讀取數據了);resume:恢復流(改變流的flowing,繼續讀取數據)

//流經過一次後,中止流動,過了2s後再動
rs.on('data', function (data) {
    rs.pause();
    console.log(data);
});
setTimeout(function () {
    rs.resume();
},2000);
複製代碼

fs.read():可讀流底層調用的就是這個方法,最原生的讀方法

//fd文件描述符,通常經過fs.open中獲取
//buffer是讀取後的數據放入的緩存目標
//0,從buffer的0位置開始放入
//BUFFER_SIZE,每次放BUFFER_SIZE這麼長的長度
//index,每次從文件的index的位置開始讀
//bytesRead,真實讀到的個數
fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){

})
複製代碼

那讓咱們本身來實現一個可愛的讀流吧!

let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
  constructor(path,options = {}){
    super()
    this.path = path
    this.highWaterMark = options.highWaterMark || 64*1024
    this.flags = options.flags || 'r'
    this.start = options.start || 0
    this.pos = this.start     //會隨着讀取的位置改變
    this.autoClose = options.autoClose || true
    this.end = options.end || null
    //默認null就是buffer
    this.encoding = options.encoding || null

    //參數的問題
    this.flowing = null //非流動模式
    //建立個buffer用來存儲每次讀出來的數據
    this.buffer = Buffer.alloc(this.highWaterMark)
    //打開這個文件
    this.open()
    //此方法默認同步調用 每次設置on監聽事件時都會調用以前全部的newListener事件
    this.on('newListener',(type)=>{// 等待着他監聽data事件
      if(type === 'data'){
        this.flowing = true
        //開始讀取 客戶已經監聽的data事件
        this.read()
      }
    })
  }
  //默認第一次調用read方法時fd還沒獲取 因此不能直接讀
  read(){
    if(typeof this.fd != 'number'){
      //等待着觸發open事件後fd確定拿到了 再去執行read方法
      return this.once('open',()=>{this.read()})
    }

    //每次讀的時候都要判斷一下下次讀幾個 若是沒有end就根據highWaterMark來(讀全部的) 若是有且大於highWaterMark就根據highWaterMark來 若是小於highWaterMark就根據end來
    let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark
    fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,byteRead)=>{
      this.pos += byteRead
      let b = this.encoding?this.buffer.slice(0,byteRead).toString(this.encoding):this.buffer.slice(0,byteRead)
      this.emit('data',b)
      //若是讀取到的數量和highWaterMark同樣 說明還得繼續讀
      if((byteRead === this.highWaterMark)&&this.flowing){
        this.read()
      }
      if(byteRead < this.highWaterMark){
        this.emit('end')
        this.destory()
      }
    })
  }
  destory(){
    if(typeof this.fd != 'number'){
      return this.emit('close')
    }
    //若是文件被打開過 就關閉文件而且觸發close事件
    fs.close(this.fd,()=>{
      this.emit('close')
    })
  }
  pause(){
    this.flowing = false
  }
  resume(){
    this.flowing = true
    this.read()
  }
  open(){
    //fd表示的就是當前this.path的這個文件,從3開始(number類型)
    fs.open(this.path,this.flags,(err,fd)=>{
      //有可能fd這個文件不存在 須要作處理
      if(err){
        //若是有自動關閉 則幫他銷燬
        if(this.autoClose){
          //銷燬(關閉文件,觸發關閉文件事件)
          this.destory()
        }
        //若是有錯誤 就會觸發error事件
        this.emit('error',err)
        return
      }
      //保存文件描述符
      this.fd = fd
      //當文件打開成功時觸發open事件
      this.emit('open',this.fd)
    })
  }
}
複製代碼

Readable

這個方法是可讀流的一種暫停模式,他的模式能夠參考爲讀流是往水杯倒水的人,Readable是喝水的人,他們之間存在着一種聯繫,只要Readable喝掉一點水,讀流就會繼續往裏倒數組

Readable是什麼?

  • 他會在剛開始監聽Readable的時候就觸發流的,此時流就會讀取一次數據,以後流會監聽,若是有人讀過流(喝過水),而且減小,就會再去讀一次(倒點水)
  • 主要能夠用來作行讀取器(LineReader)
let fs = require('fs')
let read = require('./ReadableStream')
let rs = fs.createReadStream('./a.txt', {
  //每次讀7個
  highWaterMark: 7
})
//若是讀流第一次所有讀下來而且小於highWaterMark,就會再讀一次(再觸發一次readable事件)
//若是rs.read()不加參數,一次性讀完,會從緩存區再讀一次,爲null
//若是readable每次都恰好讀完(即rs.read()的參數恰好和highWaterMark相等),就會一直觸發readable事件,若是最後不足他想喝的數,他就會先觸發一次null,最後把剩下的喝完
//一開始緩存區爲0的時候也會默認調一次readable事件
rs.on('readable', () => {
  let result = rs.read(2)
  console.log(result)
})
複製代碼

實戰:行讀取器(日常咱們的文件可能有回車、換行,此時若是要每次想讀一行的數據,就得用到readable)緩存

let EventEmitter = require('events')
//若是要將內容所有讀出就用on('data'),精確讀取就用on('readable')
class LineReader extends EventEmitter {
  constructor(path) {
    super()
    this.rs = fs.createReadStream(path)
    //回車符的十六進制
    let RETURN = 0x0d
    //換行符的十六進制
    let LINE = 0x0a
    let arr = []
    this.on('newListener', (type) => {
      if (type === 'newLine') {
        this.rs.on('readable', () => {
          let char
          //每次讀一個,當讀完的時候會返回null,終止循環
          while (char = this.rs.read(1)) {
            switch (char[0]) {
              case RETURN:
                break;
              //Mac下只有換行符,windows下是回車符和換行符,須要根據不一樣的轉換。由於我這裏是Mac
              case LINE:
                //若是是換行符就把數組轉換爲字符串
                let r = Buffer.from(arr).toString('utf8')
                //把數組清空
                arr.length = 0
                //觸發newLine事件,把獲得的一行數據輸出
                this.emit('newLine', r)
                break;
              default:
                //若是不是換行符,就放入數組中
                arr.push(char[0])
            }
          }
        })
      }
    })
    //以上只能取出以前的換行符前的代碼,最後一行的後面沒有換行符,因此須要特殊處理。當讀流讀完須要觸發end事件時
    this.rs.on('end', () => {
      //取出最後一行數據,轉成字符串
      let r = Buffer.from(arr).toString('utf8')
      arr.length = 0
      this.emit('newLine', r)
    })
  }
}

let lineReader = new LineReader('./a.txt')
lineReader.on('newLine', function (data) {
  console.log(data)
})
複製代碼

那麼Readable究竟是怎樣的存在呢?咱們接下來實現他的源碼,看看內部到底怎麼回事服務器

let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
  constructor(path,options = {}){
    super()
    this.path = path
    this.highWaterMark = options.highWaterMark || 64*1024
    this.flags = options.flags || 'r'
    this.start = options.start || 0
    this.pos = this.start     //會隨着讀取的位置改變
    this.autoClose = options.autoClose || true
    this.end = options.end || null
    //默認null就是buffer
    this.encoding = options.encoding || null

    //參數的問題
    this.reading = false //非流動模式
    //建立個buffer用來存儲每次讀出來的數據
    this.buffers = []
    //緩存區長度
    this.len = 0
    //是否要觸發readable事件
    this.emittedReadable = false
    //觸發open獲取文件的fd標識符
    this.open()
    //此方法默認同步調用 每次設置on監聽事件時都會調用以前全部的newListener事件
    this.on('newListener',(type)=>{// 等待着他監聽data事件
      if(type === 'readable'){
        //開始讀取 客戶已經監聽的data事件
        this.read()
      }
    })
  }
  //readable真正的源碼中的方法,計算出和n最接近的2的冪次數
  computeNewHighWaterMark(n) {
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
    return n;
  }
  read(n){
    //當讀的數量大於水平線,會經過取2的冪次取比他大和最接近的數
    if(this.len < n){
      this.highWaterMark = this.computeNewHighWaterMark(n)
      //從新觸發readbale的callback,因此第一次會觸發null
      this.emittedReadable = true
      //從新讀新的水位線
      this._read()
    }
    //真正讀取到的
    let buffer = null
    //說明緩存裏有這麼多,取出來
    if(n>0 && n<=this.len){
      //定義一個buffer
      buffer = Buffer.alloc(n)
      let buf
      let flag = true
      let index = 0
      //[buffer<1,2,3,4>,buffer<1,2,3,4>,buffer<1,2,3,4>]
      //每次取出緩存前的第一個buffer
      while(flag && (buf = this.buffers.shift())){
        for(let i=0;i<buf.length;i++){
          //把取出的一個buffer中的數據放入新定義的buffer中
          buffer[index++] = buf[i]
          //當buffer的長度和n(參數)長度同樣時,中止循環
          if(index === n){
            flag = false
            //維護緩存,由於可能緩存中的buffer長度大於n,當取出n的長度時,還會剩下其他的buffer,咱們須要切割buf而且放到緩存數組以前
            this.len -= n
            let r = buf.slice(i+1)
            if(r.length){
              this.buffers.unshift(r)
            }
            break
          }
        }
      }
    }
    //若是緩存區沒有東西,等會讀完須要觸發readable事件
    //這裏會有一種情況,就是若是每次Readable讀取的數量正好等於highWaterMark(流讀取到緩存的長度),就會每次都等於0,每次都觸發Readable事件,就會每次讀,讀到沒有爲止,最後還會觸發一下null
    if(this.len === 0){
      this.emittedReadable = true
    }
    if(this.len < this.highWaterMark){
      //默認,一開始的時候開始讀取
      if(!this.reading){
        this.reading = true
        //真正多讀取操做
        this._read()
      }
    }
    return buffer&&buffer.toString()
  }
  _read(){
    if(typeof this.fd != 'number'){
      //等待着觸發open事件後fd確定拿到了 再去執行read方法
      return this.once('open',()=>{this._read()})
    }
    //先讀這麼多buffer
    let buffer = Buffer.alloc(this.highWaterMark)
    fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{
      if(byteRead > 0){
        //當第一次讀到數據後,改變reading的狀態,若是觸發read事件,可能還會在觸發第二次_read
        this.reading = false
        //每次讀到數據增長緩存取得長度
        this.len += byteRead
        //每次讀取以後,會增長讀取的文件的讀取開始位置
        this.pos += byteRead
        //將讀到的buffer放入緩存區buffers中
        this.buffers.push(buffer.slice(0,byteRead))
        //觸發readable
        if(this.emittedReadable){
          this.emittedReadable = false
          //能夠讀取了,默認開始的時候杯子填滿了
          this.emit('readable')
        }
      }else{
        //沒讀到就出發end事件
        this.emit('end')
      }
    })
  }
  destory(){
    if(typeof this.fd != 'number'){
      return this.emit('close')
    }
    //若是文件被打開過 就關閉文件而且觸發close事件
    fs.close(this.fd,()=>{
      this.emit('close')
    })
  }
  open(){
    //fd表示的就是當前this.path的這個文件,從3開始(number類型)
    fs.open(this.path,this.flags,(err,fd)=>{
      //有可能fd這個文件不存在 須要作處理
      if(err){
        //若是有自動關閉 則幫他銷燬
        if(this.autoClose){
          //銷燬(關閉文件,觸發關閉文件事件)
          this.destory()
        }
        //若是有錯誤 就會觸發error事件
        this.emit('error',err)
        return
      }
      //保存文件描述符
      this.fd = fd
      //當文件打開成功時觸發open事件
      this.emit('open',this.fd)
    })
  }
}
複製代碼
  • Readable和讀流的data的區別就是,Readable能夠控制本身從緩存區讀多少和控制讀的次數,而data是每次讀取都清空緩存,讀多少輸出多少
  • 咱們能夠看一下下面這個例子
let rs = fs.createReadStream('./a.txt')
rs.on('data',(data)=>{
  console.log(data)
})
//由於上面的data事件把數據讀了,清空緩存區。因此致使下面的readable讀出爲null
rs.on('readable',()=>{
  let result = r.read(1)
  console.log(result)
})
複製代碼

自定義可讀流

由於createReadStream內部調用了ReadStream類,ReadStream又實現了Readable接口,ReadStream實現了_read()方法,因此咱們經過自定義一個類繼承stream模塊的Readable,並在原型上自定義一個_read()就能夠自定義本身的可讀流koa

let { Readable } = require('stream')

class MyRead extends Readable{
  //流須要一個_read方法,方法中push什麼,外面就接收什麼
  _read(){
    //push方法就是上面_read方法中的push同樣,把數據放入緩存區中
    this.push('100')
    //若是push了null就表示沒有東西可讀了,中止(若是不寫,就會一直push上面的值,死循環)
    this.push(null)
  }
}
複製代碼

可寫流

  • 若是文件不存在會建立,若是有內容會被清空
  • 讀取到highWaterMark的時候就會輸出
  • 第一次是真的寫到文件 後面就是寫入緩存區 再從緩存區裏面去取

參數(和可讀流的相似)

  • path:寫入的文件的路徑
  • option:
    • highWaterMark:水位線,一次可寫入緩存中的字節,通常默認是64k
    • flags:標識,寫入文件要作的操做,默認是w
    • encoding:編碼,默認爲buffer
    • start:開始寫入的索引位置
    • end:結束寫入的索引位置(包括結束位置)
    • autoClose:寫入完畢是否關閉,默認爲true
let ReadStream = require('./ReadStream')
//讀取的時候默認讀64k 
let rs = new ReadStream('./a.txt',{
  highWaterMark: 2,//一次讀的字節 默認64k
  flags: 'r',      //標示 r爲讀 w爲寫
  autoClose: true, //默認讀取完畢後自動關閉
  start: 0,
  end: 5,          //流是閉合區間包start,也包end 默認是讀完
  encoding: 'utf8' //默認編碼是buffer
})
複製代碼

方法

write

let fs = require('fs')
let ws = fs.createWriteStream('./d.txt',{
  flags: 'w',
  encoding: 'utf8',
  start: 0,
  //write的highWaterMark只是用來觸發是否是幹了
  highWaterMark: 3 //寫是默認16k
})
//返回boolean 每當write一次都會在ws中吃下一個饅頭 當吃下的饅頭數量達到highWaterMark時 就會返回false 吃不下了會把其他放入緩存 其他狀態返回true
//write只能放string或者buffer
flag = ws.write('1','utf8',()=>{
  console.log(i)
})
複製代碼

drain

//drain只有嘴塞滿了 吃完(包括內存中的,就是地下的)纔會觸發 這裏是兩個條件 一個是必須是吃下highWaterMark個饅頭 而且在吃完的時候纔會callback
ws.on('drain',()=>{
  console.log('幹了')
})
複製代碼

fs.write():可讀流底層調用的就是這個方法,最原生的讀方法

//wfd文件描述符,通常經過fs.open中獲取
//buffer,要取數據的緩存源
//0,從buffer的0位置開始取
//BUFFER_SIZE,每次取BUFFER_SIZE這麼長的長度
//index,每次寫入文件的index的位置
//bytesRead,真實寫入的個數
fs.write(wfd,buffer,0,bytesRead,index,function(err,bytesWrite){

})
複製代碼

經過代碼實現

let fs = require('fs')
let EventEmitter = require('events')
//只有第一次write的時候直接用_write寫入文件 其他都是放到cache中 可是len超過了highWaterMark就會返回false告知須要drain 很佔緩存
//從第一次的_write開始 回去一直經過clearBuffer遞歸_write寫入文件 若是cache中沒有了要寫入的東西 會根據needDrain來判斷是否觸發乾點
class WriteStream extends EventEmitter{
  constructor(path,options = {}){
    super()
    this.path = path
    this.highWaterMark = options.highWaterMark || 64*1024
    this.flags = options.flags || 'r'
    this.start = options.start || 0
    this.pos = this.start
    this.autoClose = options.autoClose || true
    this.mode = options.mode || 0o666
    //默認null就是buffer
    this.encoding = options.encoding || null

    //打開這個文件
    this.open()

    //寫文件的時候須要哪些參數
    //第一次寫入的時候 是給highWaterMark個饅頭 他會硬着頭皮寫到文件中 以後纔會把多餘吃不下的放到緩存中
    this.writing = false
    //緩存數組
    this.cache = []
    this.callbackList = []
    //數組長度
    this.len = 0
    //是否觸發drain事件
    this.needDrain = false
  }

  clearBuffer(){
    //取緩存中最上面的一個
    let buffer = this.cache.shift()
    if(buffer){
      //有buffer的狀況下
      this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)
    }else{
      //沒有的話 先看看需不須要drain
      if(this.needDrain){
        //觸發drain 並初始化全部狀態
        this.writing = false
        this.needDrain = false
        this.callbackList.shift()()
        this.emit('drain')
        
      }
      this.callbackList.map(v=>{
        v()
      })
      this.callbackList.length = 0
    }
  }
  _write(chunk,encoding,clearBuffer,callback){
    //由於write方法是同步調用的 因此可能還沒獲取到fd
    if(typeof this.fd != 'number'){
      //直接在open的時間對象上註冊一個一次性事件 當open被emit的時候會被調用
      return this.once('open',()=>this._write(chunk,encoding,clearBuffer,callback))
    }
    fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{
      this.pos += byteWrite
      //每次寫完 相應減小內存中的數量
      this.len -= byteWrite
      if(callback) this.callbackList.push(callback)
      //第一次寫完
      clearBuffer()
      
    })
  }

  //寫入方法
  write(chunk,encoding=this.encoding,callback){
    //判斷chunk必須是字符串或者buffer 爲了統一都變成buffer
    chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding)
    //維護緩存的長度 3
    this.len += chunk.length
    let ret = this.len < this.highWaterMark
    if(!ret){
      //表示要觸發drain事件
      this.needDrain = true
    }
    //正在寫入的應該放到內存中
    if(this.writing){
      this.cache.push({
        chunk,
        encoding,
        callback
      })
    }else{
      //這裏是第一次寫的時候
      this.writing = true
      //專門實現寫的方法
      this._write(chunk,encoding,()=>this.clearBuffer(),callback)
    }
    // console.log(ret)
    //能不能繼續寫了 false表明下次寫的時候更佔內存
    return ret
  }

  destory(){
    if(typeof this.fd != 'number'){
      return this.emit('close')
    }
    //若是文件被打開過 就關閉文件而且觸發close事件
    fs.close(this.fd,()=>{
      this.emit('close')
    })
  }
  open(){
    //fd表示的就是當前this.path的這個文件,從3開始(number類型)
    fs.open(this.path,this.flags,(err,fd)=>{
      //有可能fd這個文件不存在 須要作處理
      if(err){
        //若是有自動關閉 則幫他銷燬
        if(this.autoClose){
          //銷燬(關閉文件,出發關閉文件事件)
          this.destory()
        }
        //若是有錯誤 就會觸發error事件
        this.emit('error',err)
        return
      }
      //保存文件描述符
      this.fd = fd
      //當文件打開成功時觸發open事件
      this.emit('open',this.fd)
    })
  }
}
複製代碼

自定義可寫流

由於createWriteStream內部調用了WriteStream類,WriteStream又實現了Writable接口,WriteStream實現了_write()方法,因此咱們經過自定義一個類繼承stream模塊的Writable,並在原型上自定義一個_write()就能夠自定義本身的可寫流socket

let { Writable } = require('stream')

class MyWrite extends Writable{
  _write(chunk,encoding,callback){
    //write()的第一個參數,寫入的數據
    console.log(chunk)
    //這個callback,就至關於咱們上面的clearBuffer方法,若是不執行callback就不會繼續從緩存中取出寫
    callback()
  }
}

let write = new MyWrite()
write.write('1','utf8',()=>{
  console.log('ok')
})
複製代碼

pipe

管道流,是可讀流上的方法,至於爲何放到這裏,主要是由於須要2個流的基礎知識,是可讀流配合可寫流的一種傳輸方式。若是用原來的讀寫,由於寫比較耗時,因此會多讀少寫耗內存,但用了pipe就不會了,始終用規定的內存。學習

用法

let fs = require('fs')
//pipe方法叫管道 能夠控制速率
let rs = fs.createReadStream('./d.txt',{
  highWaterMark: 4
})
let ws = fs.createWriteStream('./e,txt',{
  highWaterMark: 1
})
//會監聽rs的on('data')將讀取到的數據,經過ws.write的方法寫入文件
//調用寫的一個方法 返回boolean類型
//若是返回false就調用rs的pause方法 暫停讀取
//等待可寫流 寫入完畢在監聽drain resume rs
rs.pipe(ws) //會控制速率 防止淹沒可用內存
複製代碼

本身實現一下

let fs = require('fs')
//這兩個是上面本身寫的ReadStream和WriteStream
let ReadStream = require('./ReadStream')
let WriteStream = require('./WriteStream')

//若是用原來的讀寫,由於寫比較耗時,因此會多讀少寫,耗內存
ReadStream.prototype.pipe = function(dest){
  this.on('data',(data)=>{
    let flag = dest.write(data)
    //若是寫入的時候嘴巴吃滿了就不繼續讀了,暫停
    if(!flag){
      this.pause()
    }
  })
  //若是寫的時候嘴巴里的吃完了,就會繼續讀
  dest.on('drain',()=>{
    this.resume()
  })
  this.on('end',()=>{
    this.destory()
    //清空緩存中的數據
    fs.fsync(dest.fd,()=>{
      dest.destory()
    })
  })
}
複製代碼

雙工流

有了雙工流,咱們能夠在同一個對象上同時實現可讀和可寫,就好像同時繼承這兩個接口。 重要的是雙工流的可讀性和可寫性操做徹底獨立於彼此。這僅僅是將兩個特性組合成一個對象。

let { Duplex } = require('stream')
//雙工流,可讀可寫
class MyDuplex extends Duplex{
  _read(){
    this.push('hello')
    this.push(null)
  }
  _write(chunk,encoding,clearBuffer){
    console.log(chunk)
    clearBuffer()
  }
}

let myDuplex = new MyDuplex()
//process.stdin是node自帶的process進程中的可讀流,會監聽命令行的輸入
//process.stdout是node自帶的process進程中的可寫流,會監聽並輸出在命令行中
//因此這裏的意思就是在命令行先輸出hello,而後咱們輸入什麼他就出來對應的buffer(先做爲可讀流出來)
process.stdin.pipe(myDuplex).pipe(process.stdout)
複製代碼

轉換流

轉換流的輸出是從輸入中計算出來的。對於轉換流,咱們沒必要實現readwrite的方法,咱們只須要實現一個transform方法,將二者結合起來。它有write方法的意思,咱們也能夠用它來push數據。

let { Transform } = require('stream')

class MyTransform extends Transform{
  _transform(chunk,encoding,callback){
    console.log(chunk.toString().toUpperCase())
    callback()
  }
}
let myTransform = new MyTransform()


class MyTransform2 extends Transform{
  _transform(chunk,encoding,callback){
    console.log(chunk.toString().toUpperCase())
    this.push('1')
    // this.push(null)
    callback()
  }
}
let myTransform2 = new MyTransform2()

//此時myTransform2被做爲可寫流觸發_transform,輸出輸入的大寫字符後,會經過可讀流push字符到下一個轉換流中
//當寫入的時候纔會觸發transform的值,此時纔會push,因此後面的pipe拿到的chunk是前面的push的值
process.stdin.pipe(myTransform2).pipe(myTransform)
複製代碼

總結

可讀流

  • 在 flowing 模式下, 可讀流自動從系統底層讀取數據,並經過 EventEmitter 接口的事件儘快將數據提供給應用。
  • 在 paused 模式下,必須顯式調用 stream.read() 方法來從流中讀取數據片斷。
  • 全部初始工做模式爲 paused 的 Readable 流,能夠經過下面三種途徑切換到 flowing 模式:
    • 監聽 'data' 事件
    • 調用 stream.resume() 方法
    • 調用 stream.pipe() 方法將數據發送到 Writable
  • 可讀流能夠經過下面途徑切換到 paused 模式:
    • 若是不存在管道目標(pipe destination),能夠經過調用 stream.pause() 方法實現。
    • 若是存在管道目標,能夠經過取消 'data' 事件監聽,並調用 stream.unpipe() 方法移除全部管道目標來實現。

可寫流

  • 須要知道只有在真正的吃滿了,而且等到把嘴裏的和地上的饅頭(緩存中的)都吃下了纔會觸發drain事件
  • 第一次寫入會直接寫入文件中,後面會從緩存中一個個取

雙工流

  • 只是對可寫可讀流的一種應用,既可做爲可讀流,也能做爲可寫流,而且做爲可讀或者可寫時時隔離

轉換流

  • 通常轉換流是邊輸入邊輸出的,並且通常只有觸發了寫入操做時纔會進入_transform方法中。跟雙工流的區別就是,他的可讀可寫是在一塊兒的。

OK,講完收工,今後你就是魔王

相關文章
相關標籤/搜索