Node.js流,這樣的打開方式對不對!

Node.js流,這樣的打開方式對不對!

俗話說的好:「人往高處走,水往低處流」;古語有云:「落花有意,流水無情」。(吃瓜羣衆:what?你特麼這是要弄啥哩!二營長📣)哎呀🤣,各位大佬,這點小事用不着驚動二營長的意大利炮了吧,進錯頻道了,立刻開始正題!node

(一)流究竟是個什麼東西?

Node.js的文件系統(fs核心模塊)在咱們的開發中應該常常用到,在沒有深刻了解學習以前,若是有人問我Node.js流究竟是個什麼東西呢?我當時的表情必定是這樣曬的: git

流究竟是個啥?不給提示讓我怎麼說的出嘴?這種問題,哎呀腦袋痛啊!看來只能去google了(哼~我程序猿早已戒了百度)。github

翻了好幾篇大佬寫的文章,不能說徹底解惑吧,也算是收穫滿滿哦🙃!可是 呢,仍是覺的它有那麼點抽象,又有那麼點難以理解!痛定思痛下,決定把大佬們的理解在小本本上記下來慢慢理解。(麻麻不再用擔憂我下次被問到還蒙圈了)api

  • 流是數據的集合——就像數組或字符串同樣。區別在於流中的數據可能不會馬上就所有可用,而且無需一次性地把這些數據所有放入內存中。這使得流在操做大量數據或者一次來自外部源的數據時變得很是強大。

來自於:Node.js Streams: Everything you need to know數組

  • 流是一組有序的,有起點和終點的字節數據傳輸手段。在應用程序中各類對象之間交換與傳輸數據的時候,老是先將該對象中所包含的數據轉換爲各類形式的流數據(即字節數據),在經過流的傳輸,到達目的對象後再將流數據轉換爲該對象中可使用的數據。

來自於:《Node.js權威指南》緩存

(二)Node.js中流的類型

知道了流是什麼,咱們還須要瞭解一下在Node.js中流的類型。流是一個很抽象接口,可是卻被Node.js中的不少對象所實現。好比HTTP服務器request和response對象都是流。那咱們就先來了解一下Node.js有四種基本的流類型:bash

  • Readable:可讀流。如Node.js的文件系統(fs)中的fs.createReadStream(path,options)就是一個可讀流的例子;
  • Writable:可寫流。如Node.js的文件系統(fs)中的fs.createWriteStream(path,options)就是一個可寫流的例子;
  • Duplex:可讀寫的流,又稱雙向流。如Node.js的網絡(net)中的net.Socket類;
  • Transform:在讀寫過程當中能夠修改和變換數據的 Duplex 流,又稱變換流。如Node.js的壓縮(zlib)中的zlib.createDeflate(options)。

在Node.js中,全部的流的實現都繼承了EventEmitter(用於實現各類事件處理的event模塊)這個類,所以,在讀取或者寫入數據的過程當中,可能會觸發各類事件。服務器

(三)createReadStream和createWriteStream

如今咱們瞭解了流的含義和類型,那麼Node.js中的流是怎麼實現的呢?咱們都知道fs.createReadStream(path,options)建立一個可讀流,fs.createWriteStream(path,options)建立一個可寫流,path是讀取文件的路徑,options是配置參數。(這配置參數也忒多了點,🤔努力記.....)網絡

  • flags: 可讀流默認是'r',可寫流默認是'w';
  • encoding: 編碼格式。可讀流默認是null(其實就是buffer啦),可寫流默認是'utf8';
  • autoClose:是否自動關閉。默認都是true(就是讀完文件或者寫入完以後自動把文件關上啦);
  • mode: 讀取和寫入的模式。默認都是0o666(可讀可寫,八進制);
  • highWaterMark: 最高水位線。可讀流默認是64 kb(每次最多讀取字節數),可寫流默認是16 kb(每次最多寫入的字節數,也能夠理解爲佔用的最大內存);
  • start:開始讀取或者開始寫入的位置。默認是從0開始的(單位是字節數);
  • fd:文件標識符。是Number類型的
  • end:是可讀流獨有的,讀取文件的最終位置,默認是Infinity(單位是字節數)。

其實,參數雖然比較多,可是都很容易理解。這裏須要特別注意的是讀取文件的時候,若是start設置爲0,end設置爲5,那麼實際上最終讀取的結果是6個字符,即至關於包前又包後!(有點小霸道哦😎)異步

在Node.js中,使用fs.createReadStream(path,options)建立可讀流和fs.createWriteStream(path,options)建立可寫流兩個方法很簡單,難道咱們就甘心僅僅停留在能用、會用的層面嗎?No、No、No!咱們不只要會用,還要知道其中的原理,他們是如何實現的?先來建立一個可讀流感覺一下其用法:

// 'a.txt'存放十個數字--> 1234567890 
let rs = fs.createReadStream('a.txt', {
  flags: 'r',
  encoding: 'utf8',
  autoClose: true,
  mode: 0o666,
  start: 0, 
  end:5,
  highWaterMark: 2 
});
rs.on('data',function (data) {
  console.log(data);
  rs.pause();
});
setInterval(() => {
  rs.resume();
}, 1000);
// 最後輸出的結果是 01 23 45 
複製代碼

監聽到'data'事件以後,流切換到流動模式,數據會被儘量快的讀出。pause、resume事件是用來暫停和恢復觸發'data'事件的(意味着讀取文件的操做中止了)。固然還能夠監聽'end'事件、'open'事件、'close'事件、'error'事件。

相比較於可讀流,咱們還應該知道可寫流的如下幾個特色:

  • 可寫流是有緩存區的概念的,第一次會真的往文件裏寫,後面的內容會先寫到緩存中
  • 可寫流寫入時會返回一個boolean類型,當返回爲false時,就不要在寫入了。(但若是返回false以後還有寫入操做,仍是會寫入文件中,由於超過的部分會放到緩存裏,這樣可能會致使內存的浪費);
  • 正在寫入的內容和緩存中的內容都消耗完後,會觸發drain事件
// 若是'a.txt'中有內容,會被寫入的內容覆蓋掉
let ws = fs.createWriteStream('a.txt',{
  flags: 'w',
  mode: 0o666,
  encoding: 'utf8',
  autoClose: true,
  start: 0,
  highWaterMark: 3
})
let flag = ws.write('1');
console.log(flag); // true
flag = ws.write('1');
console.log(flag); // true
flag = ws.write('1');
console.log(flag); // false
複製代碼

上面例子中,最後一次返回的是false,其實跟設置的highWaterMark最高水位線(設置的當前緩存區大小)有關係了。當寫入的內容大於等於highWaterMark時,就會返回flase

那咱們如何去控制寫入的時機,從而不形成內存的浪費呢?請看下面的例子。

// 複用上個例子中的可寫流實例,寫入時只佔用三個字節的內存
let i = 0;
function write(){ // 每次寫入三個字節,而後停住,寫入完成後再繼續寫入
  let flag = true;
  while(i < 9 && flag){
    flag = ws.write(i + '');
    i++
  }
}
ws.on('drain',function(){ // 達到highWaterMark觸發該事件
  console.log('寫入成功');
  write();
})
write(); // a.txt文件中--> 012345678
複製代碼

實現createReadStream和createWriteStream

上面大體瞭解了createReadStream和createWriteStream的用法和特色,若是咱們能本身實現一下可讀流和可寫流,無疑能加深咱們對其的理解。翻看Node.js的源碼,可讀流fs.createReadStream()執行後返回的是ReadStream類的實例,可寫流也是同樣的邏輯,代碼以下:

fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};
fs.createWriteStream = function(path, options) {
  return new WriteStream(path, options);
};
複製代碼

是否是感受一會兒明朗了許多,只要咱們可以封裝ReadStream和WriteStream類就能夠了。爲了可以更好的理解,基本每句代碼都有註釋哦😊!爲減小篇幅,這裏只貼出來核心read方法和write方法的實現,所有代碼請移步stream下載:

class ReadStream extends EventEmitter{
  read(){ // 讀取文件
    if (this.finished) { // 讀完以後就再也不讀了
      return;
    }
    // open打開文件是異步的,當咱們讀取的時候可能文件尚未打開
    if(typeof this.fd !== 'number'){
      this.once('open',()=>this.read());
      return;
    }
    // length表明每次讀取的字節數
    let length = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark;
    fs.read(this.fd,this.buffer,0,length,this.pos,(err,bytesRead)=>{
      if(err){
        this.emit('error',err);
        this.destroy();
        return;
      }
      if(bytesRead > 0){ // 讀到的字節數 
        this.pos += bytesRead;
        let res = this.buffer.slice(0, bytesRead); // 真實讀取到的bytesRead可能不能填滿this.buffer,須要截取,保留有用的
        res = this.encoding ? res.toString(this.encoding) : res;
        this.emit('data', res);
        if (this.flowing) { // 若是是流動模式,就繼續調用read方法讀取
          this.read();
        }
      }else {
        this.finished = true; // 讀完的標識
        this.emit('end');
        this.destroy();
      }
    })
  }
}
複製代碼

可讀流ReadStream類的封裝,最主要的就是理解read方法的實現,其餘的方法都比較簡單好理解。read方法中最難理解的就是length變量(要讀取的字節數),由於讀到最後,可能文件中的字節數小於了highWaterMark最高水位線,因此要取Math.min()最小值。打個比方:若是this.end = 4;說明總共須要讀取5個字節,this.highWaterMark= 3;說明每次讀取3個字節,第一次讀完後this.pos = 3;此時還須要在讀取2個字節就夠了。

class WriteStream extends EventEmitter {
  // chunk:寫入的內容;encoding:編碼格式;callback:寫入完成後的回調
  write(chunk,encoding=this.encoding,callback){ // 寫入的時候調用的方法
     // 爲了統一,若是傳遞的是字符串也要轉成buffer
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk,encoding);
    this.len += chunk.length; // 維護緩存的長度
    let ret = this.highWaterMark > this.len;
    if(!ret){ 
      this.needDrain = true; // 表示須要觸發drain事件
    }
    if(this.writing){ // true表示正在寫入,應該放在緩存中
      this.buffer.push({
        chunk,
        encoding,
        callback
      });
    }else{ // 第一次寫入
      this.writing = true;
      this._write(chunk,encoding,()=>this.clearBuffer()); // 實現一個寫入的方法
    }
    return ret; // write的返回值必須是true/false
  }
  _write(chunk,encoding,callback){ // 由於write方法是同步調用的,此時fd可能尚未獲取到
    if(typeof this.fd !== 'number'){ // 判斷若是文件尚未打開
      return this.once('open',()=>this._write(chunk,encoding,callback));
    }
    // 參數:fd 文件描述符; chunk是數據; 0:寫入的buffer開始的位置; chunk.length寫入的字節數; this.pos文件開始寫入數據的位置的偏移量
    fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
      this.pos += bytesWritten;
      this.len -= bytesWritten; // 每次寫入後,內存中的也要相應的減小
      callback();
    })
  }
  clearBuffer(){ // 清除緩存中的
    let buf = this.buffer.shift();
    if(buf){
      this._write(buf.chunk,buf.encoding,()=>this.clearBuffer());
    }else{
      if(this.needDrain){ // 若是須要觸發drain
        this.writing = false;
        this.needDrain = false;// 觸發一次drain 再置回false 方便下次繼續判斷
        this.emit('drain');
      }
    }
  }
}
複製代碼

可寫流中最主要的就是write方法,其又依賴_write方法clearBuffer方法。所有代碼請移步stream下載,更好理解哦b( ̄▽ ̄)d!

(四)pipe方法

其實啊,說了這麼多,又是理解含義,又是封裝代碼,都是爲了突出pipe方法導流的重要性啊。pipe方法怎麼使用呢,請注意(前方高能):

let fs = require('fs');
let rs = fs.createReadStream('a.txt', { 
  highWaterMark: 4
});
let ws = fs.createWriteStream('b.txt', { 
  highWaterMark: 1
});
rs.pipe(ws);
複製代碼

用法是否是很簡單,很直接呀!雖然僅僅這一行的代碼,但這正是其神奇之處啊。(說好的高能呢?小板凳都準備好了,你告訴我這些)。 那咱們怎麼去實現一個pipe方法呢?其實基於上面可讀流createReadStream和可寫流createWriteStream的封裝,pipe的實現就顯得很簡單了,在ReadStream類的原型上封裝pipe方法,代碼以下:

class ReadStream extends EventEmitter{
  pipe(dest){
    this.on('data',(data)=>{
      let flag = dest.write(data);
      if(!flag){
        this.pause(); // 不能繼續讀取了,等寫入完成後再繼續讀取
      }
    });
    dest.on('drain',()=>{
      this.resume();
    })
  }
}
複製代碼

再次友情提示:爲了更好的理解,能夠移步這裏stream下載所有代碼哦( ̄▽ ̄)~*!

  • pipe方法又叫管道方法,最大的優勢就是能夠控制速率(防止淹沒可用內存);
  • pipe方法的實現原理:可讀流實例rs監聽on('data')方法,將讀取到的內容調用ws.write方法(ws是可寫流實例),其方法會返回一個boolean類型,若是返回false會調用rs.pause()暫停讀取,等待可寫流寫入完畢後,調用ws.on('drain')在恢復讀取

總結:Node.js流到這裏就告一段落了,感謝你們的閱讀!若是有問題歡迎指出,共同進步哦!若是感受文章有點晦澀難懂,能夠先收藏,方便之後閱讀哦❤️!

參考文章:

相關文章
相關標籤/搜索