Node.js中不可不精的Stream

1、什麼是Stream(流)

  • 流(stream)在 Node.js 中是處理流數據的抽象接口(abstract interface)。 stream 模塊提供了基礎的API。使用這些API能夠很容易地來構建實現流接口的對象。例如, HTTP 請求 和 process.stdout 就都是流的實例。
  • 流能夠是可讀的、可寫的,或是可讀寫的。注意,全部的流都是 EventEmitter 的實例

2、流的類型

Node.js 中有四種基本的流類型:

  1. Readable - 可讀的流 (例如 fs.createReadStream())。
  2. Writable - 可寫的流 (例如 fs.createWriteStream())。
  3. Duplex - 可讀寫的流(雙工流) (例如 net.Socket)。
  4. Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate())。
var Stream = require('stream') //stream 模塊引入方式

var Readable = Stream.Readable //可讀的流
var Writable = Stream.Writable //可寫的流
var Duplex = Stream.Duplex //可讀寫的流
var Transform = Stream.Transform //在讀寫過程當中能夠修改和變換數據的 Duplex 流
複製代碼

Node.js中關於流的操做被封裝到了Stream模塊中,這個模塊也被多個核心模塊所引用。例如在fs.createReadStream()和fs.createWriteStream()的源碼實現裏,都調用了Stream模塊提供的抽象接口來實現對流數據的操做。數組

3、爲何使用Stream?

咱們經過兩個例子,瞭解一下爲何要使用Stream。緩存

Exp1:

下面是一個讀取文件內容的例子:bash

const fs = require('fs')

fs.readFile(file, function (err, content) { //讀出來的content是Buffer
  console.log(content)
  console.log(content.toString())
})
複製代碼

但若是文件內容較大,譬如在500M時,執行上述代碼的輸出爲:網絡

<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
    throw new Error('toString failed');
    ^

Error: toString failed
    at Buffer.toString (buffer.js:382:11)
複製代碼

報錯的緣由是content這個Buffer對象的長度過大,致使toString方法失敗。 可見,這種一次獲取所有內容的作法,不適合操做大文件。異步

能夠考慮使用流來讀取文件內容。socket

var fs = require('fs')

fs.createReadStream(bigFile).pipe(process.stdout) 
複製代碼

fs.createReadStream建立一個可讀流,鏈接了源頭(上游,文件)和消耗方(下游,標準輸出)。ide

執行上面代碼時,流會逐次調用fs.read(ReadStream這個類的源碼裏有一個_read方法,這個_read方法在內部調用了fs.read來實現對文件的讀取),將文件中的內容分批取出傳給下游。函數

在文件看來,它的內容被分塊地連續取走了。大數據

在下游看來,它收到的是一個前後到達的數據序列。優化

若是不須要一次操做所有內容,它能夠處理完一個數據便丟掉。

在流看來,任一時刻它都只存儲了文件中的一部分數據,只是內容在變化而已。

這種狀況就像是用水管去取池子中的水。

每當用掉一點水,水管便會從池子中再取出一點。

不管水池有多大,都只存儲了與水管容積等量的水。

Exp2:

下面是一個在線看視頻的例子,假定咱們經過HTTP請求返回視頻內容給用戶

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
    fs.readFile(videoPath, (err, data) => {
    res.end(data);
});
}).listen(8080);
複製代碼

但這樣有兩個明顯的問題

  1. 視頻文件須要所有讀取完,才能返回給用戶,這樣等待時間會很長。
  2. 視頻文件一次全放入內存中,內存吃不消。

用流能夠將視頻文件一點一點讀到內存中,再一點一點返回給用戶,讀一部分,寫一部分。(利用了 HTTP 協議的 Transfer-Encoding: chunked 分段傳輸特性),用戶體驗獲得優化,同時對內存的開銷明顯降低。

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
    fs.createReadStream(videoPath).pipe(res);
}).listen(8080);
複製代碼

經過上述兩個例子,咱們知道,在大數據狀況下必須使用流式處理。

4、可讀流(Readable Stream)

可讀流(Readable streams)是對提供數據的源頭(source)的抽象。

常見的可讀流:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • TCP sockets //sockets是一個雙工流,便可讀可寫的流
  • process.stdin //標準輸入

全部的 Readable Stream 都實現了 stream.Readable 類定義的接口。

可讀流的兩種模式(flowing 和 paused)

  1. 在 flowing 模式下,可讀流自動從系統底層讀取數據,並經過 EventEmitter 接口的事件儘快將數據提供給應用(全部的流都是 EventEmitter 的實例)。

  2. 在 paused 模式下,必須顯式調用 stream.read()方法來從流中讀取數據片斷。

建立流的Readable流,默認是非流動模式(paused模式),默認不會讀取數據。全部初始工做模式爲paused的Readable流,能夠經過下面三種途徑切換爲flowing模式:

  • 監聽’data’事件
  • 調用stream.resume()方法
  • 調用stream.pipe()方法將數據發送到Writable

fs.createReadStream(path[, options])源碼實現

//文件名 ReadStream.js
let fs = require('fs');//讀取文件
let EventEmitter = require('events');
class ReadStream extends EventEmitter {//流操做都是基於事件的
  constructor(path, options = {}) {
    super();
    //須要的參數
    this.path = path;//讀取文件的路徑
    this.highWaterMark = options.highWaterMark || 64 * 1024;//緩衝區大小,默認64KB
    this.autoClose = options.autoClose || true;//是否須要自動關閉文件描述符,默認爲true
    this.start = options.start || 0; //options 能夠包括 start 和 end 值,使其能夠從文件讀取必定範圍的字節而不是整個文件
    this.pos = this.start; // 從文件的那個位置開始讀取內容,pos會隨着讀取的位置而改變
    this.end = options.end || null; // null表示沒傳遞
    this.encoding = options.encoding || null;
    this.flags = options.flags || 'r';//以何種方式操做文件

    // 參數的問題
    this.flowing = null; // 默認爲非流動模式
    // 建一個buffer存放讀出來的數據
    this.buffer = Buffer.alloc(this.highWaterMark);
    this.open(); 
    // {newListener:[fn]}
    // 次方法默認同步調用的
    this.on('newListener', (type) => { // 等待着 它監聽data事件
      if (type === 'data') {//當監聽到data事件時,把流設置爲流動模式
        this.flowing = true;
        this.read();// 開始讀取 客戶已經監聽了data事件
      }
    })
  }
  pause(){//將流從flowing模式切換爲paused模式
    this.flowing = false;
  }
  resume(){//將流從paused模式切換爲flowing模式
    this.flowing =true;
    this.read();//將流從paused模式切換爲flowing模式後,繼續讀取文件內容
  }
  read(){ // 默認第一次調用read方法時尚未獲取fd,文件的打開是異步的,因此不能直接讀
    if(typeof this.fd !== 'number'){ //若是fd不是number類型,證實文件尚未打開,此時須要監聽一次open事件,由於文件一打開,就會觸發open事件,這個在this.open()裏寫了
       return this.once('open',() => this.read()); // 等待着觸發open事件後fd確定拿到了,拿到之後再去執行read方法
    }
    // 當獲取到fd時 開始讀取文件了
    // 第一次應該讀2個 第二次應該讀2個
    // 第二次pos的值是4 end是4
    // 讀取文件裏一共4有個數爲123 4,咱們讀取裏面的123 4
    let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//規定每次讀取多少個字節
    fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead爲真實的讀到了幾個字節的內容
      // 讀取完畢
      this.pos += byteRead; // 讀出來兩個,pos位置就日後移兩位
      // this.buffer默認就是三個
      let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//對讀出來的內容進行編碼
      this.emit('data', b);//觸發data事件,將讀到的內容輸出給用戶
      if ((byteRead === this.highWaterMark)&&this.flowing){
        return this.read(); // 繼續讀
      }
      // 這裏就是沒有更多的邏輯了
      if (byteRead < this.highWaterMark){
        // 沒有更多了
        this.emit('end'); // 讀取完畢
        this.destroy(); // 銷燬便可
      }
    });
  }
  // 打開文件用的
  destroy() {
    if (typeof this.fd != 'number') { return this.emit('close'); } //若是文件還沒打開,直接觸發close事件
    fs.close(this.fd, () => {
      // 若是文件打開過了 那就關閉文件而且觸發close事件
      this.emit('close');
    });
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它標識的就是當前this.path這個文件,從3開始(number類型)
      if (err) {
        if (this.autoClose) { // 若是須要自動關閉我再去銷燬fd
          this.destroy(); // 銷燬(關閉文件,觸發關閉事件)
        }
        this.emit('error', err); // 若是有錯誤觸發error事件
        return;
      }
      this.fd = fd; // 保存文件描述符
      this.emit('open', this.fd); // 文件被打開了,觸發文件被打開的方法
    });
  }
  pipe(dest){//管道流的實現 pipe()方法是ReadStream下的方法,它裏面的參數是WritableStream
    this.on('data',(data)=>{
      let flag = dest.write(data);
      if(!flag){//這個flag就是每次調用ws.write()後返回的讀狀態值
        this.pause();// 已經不能繼續寫了,等他寫完了再恢復
      }
    });
    dest.on('drain',()=>{//當讀取緩存區清空後
      console.log('寫一下停一下')
      this.resume();//繼續往dest寫入數據
    });
  }
}
module.exports = ReadStream;//導出可讀流
複製代碼

使用fs.createReadStream()

// 流:有序的有方向的,能夠本身控制速率
// 讀:讀是將內容讀取到內存中 
// 寫:寫是將內存或者文件的內容寫入到文件內
// 讀取的時候默認讀 默認一次讀取64k,encoding 讀取出來的內容默認都是buffer
//let fs = require('fs');
//let rs = fs.createReadStream({...});//原生實現可讀流
let ReadStream = require('./ReadStream');
let rs = new ReadStream('./2.txt', {
  highWaterMark: 3, // 字節
  flags:'r',//讀文件
  autoClose:true, // 默認讀取完畢後自動關閉文件描述符
  start:0,
  //end:3,// 流是閉合區間 包start也包end
  encoding:'utf8'
});
// 默認建立一個流 是非流動模式(上述源碼中有寫的),默認不會讀取數據
// 若是咱們須要接收數據,那咱們要監聽data事件,這樣數據會自動的流出來
rs.on('error',function (err) {// 一般,這會在底層系統內部出錯從而不能產生數據,或當流的實現試圖傳遞錯誤數據時發生。
  console.log(err)
});
rs.on('open',function () {//文件被打開了,獲取到了fd。內部會自動的觸發這個事件 rs.emit('data'); 
  console.log('文件打開了');
});
rs.on('data',function (data) {//有數據流出來了
  console.log(data);
  rs.pause(); // 暫停觸發on('data')事件,將流動模式又轉化成了非流動模式
});
setTimeout(()=>{rs.resume()},3000);//三秒鐘以後再將非流動模式轉化爲流動模式
rs.on('end',function () {// 讀取完畢
  console.log('讀取完畢了');
});
rs.on('close',function () {//close 事件將在流或其底層資源(好比一個文件)關閉後觸發。close 事件觸發後,該流將不會再觸發任何事件。
  //console.log('關閉')
});
複製代碼

4、可寫流(Writable Stream)

可寫流是對數據流向設備的抽象,用來消費上游流過來的數據,經過可寫流程序能夠把數據寫入設備,常見的是本地磁盤文件或者 TCP、HTTP 等網絡響應。

常見的可寫流:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

全部 Writable 流都實現了 stream.Writable 類定義的接口。

可寫流的使用

調用可寫流實例的 write() 方法就能夠把數據寫入可寫流

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // 設置編碼格式
rs.on('data', chunk => {
ws.write(chunk); // 寫入數據
});
複製代碼

監聽了可讀流的data事件就會使可讀流進入流動模式,咱們在回調事件裏調用了可寫流的 write() 方法,這樣數據就被寫入了可寫流抽象的設備destPath中。

write() 方法有三個參數

  • chunk {String| Buffer},表示要寫入的數據
  • encoding 當寫入的數據是字符串的時候能夠設置編碼
  • callback 數據被寫入以後的回調函數

drain事件

若是調用 stream.write(chunk)方法返回false,表示當前緩存區已滿,流將在適當的時機(緩存區清空後)觸發drain事件。

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // 設置編碼格式
rs.on('data', chunk => {
let flag = ws.write(chunk); // 寫入數據
if (!flag) { // 若是緩存區已滿暫停讀取
rs.pause();
}
});
 
ws.on('drain', () => {
rs.resume(); // 緩存區已清空 繼續讀取寫入
});
複製代碼

fs.createWriteStream(path[, options])源碼實現

// 文件 WriteStream.js
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.flags = options.flags || 'w';
    this.encoding = options.encoding || 'utf8';
    this.start = options.start || 0;
    this.pos = this.start;
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark || 16 * 1024;
    this.open(); // fd 異步的  //觸發一個open事件,當觸發open事件後fd確定就存在了

    // 寫文件的時候 須要的參數有哪些
    // 第一次寫入是真的往文件裏寫
    this.writing = false; // 默認第一次就不是正在寫入
    // 用簡單的數組來模擬一下緩存
    this.cache = [];
    // 維護一個變量,表示緩存的長度
    this.len = 0;
    // 是否觸發drain事件
    this.needDrain = false;
  }
  clearBuffer() {
    let buffer = this.cache.shift();
    if (buffer) { // 若是緩存裏有
      this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer());
    } else {// 若是緩存裏沒有了
      if (this.needDrain) { // 須要觸發drain事件
        this.writing = false; // 告訴下次直接寫就能夠了 不須要寫到內存中了
        this.needDrain = false;
        this.emit('drain');
      }
    }
  }
  _write(chunk, encoding, clearBuffer) { // 由於write方法是同步調用的此時fd尚未獲取到,因此等待獲取到再執行write操做
    if (typeof this.fd != 'number') {
      return this.once('open', () => this._write(chunk, encoding, clearBuffer));
    }
    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => {
      this.pos += byteWritten;
      this.len -= byteWritten; // 每次寫入後就要在內存中減小一下
      clearBuffer(); // 第一次就寫完了
    })
  }
  write(chunk, encoding = this.encoding) { // 客戶調用的是write方法去寫入內容
    // 要判斷 chunk必須是buffer或者字符串 爲了統一,若是傳遞的是字符串也要轉成buffer
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
    this.len += chunk.length; // 維護緩存的長度 3
    let ret = this.len < this.highWaterMark;
    if (!ret) {
      this.needDrain = true; // 表示須要觸發drain事件
    }
    if (this.writing) { // 表示正在寫入,應該放到內存中
      this.cache.push({
        chunk,
        encoding,
      });
    } else { // 第一次
      this.writing = true;
      this._write(chunk, encoding, () => this.clearBuffer()); // 專門實現寫的方法
    }
    return ret; // 能不能繼續寫了,false表示下次的寫的時候就要佔用更多內存了
  }
  destroy() {
    if (typeof this.fd != 'number') {
      this.emit('close');
    } else {
      fs.close(this.fd, () => {
        this.emit('close');
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err);
        if (this.autoClose) {
          this.destroy(); // 若是自動關閉就銷燬文件描述符
        }
        return;
      }
      this.fd = fd;
      this.emit('open', this.fd);
    });
  }
}
module.exports = WriteStream;
複製代碼

使用fs.createWriteStream()

// 可寫流有緩存區的概念
// 1.第一次寫入是真的向文件裏寫,第二次在寫入的時候是放到了緩存區裏
// 2.寫入時會返回一個boolean類型,返回爲false時表示緩存區滿了,不要再寫入了
// 3.當內存和正在寫入的內容消耗完後,會觸發一個drain事件
//let fs = require('fs');
//let rs = fs.createWriteStream({...});//原生實現可寫流
let WS = require('./WriteStream')
let ws = new WS('./2.txt', {
  flags: 'w', // 寫入文件,默認文件不存在會建立
  highWaterMark: 1, // 設置當前緩存區的大小
  encoding: 'utf8', // 文件裏存放的都是二進制
  start: 0,
  autoClose: true, // 自動關閉文件描述符
  mode: 0o666, // 可讀可寫
});
// drain的觸發時機,只有當highWaterMark填滿時,纔可能觸發drain
// 當嘴裏的和地下的都吃完了,就會觸發drain方法
let i = 9;
function write() {
  let flag = true;
  while (flag && i >= 0) {
    i--;
    flag = ws.write('111'); // 987 // 654 // 321 // 0
    console.log(flag)
  }
}
write();
ws.on('drain', function () {
  console.log('dry');
  write();
});
複製代碼

總結

stream(流)分爲可讀流(flowing mode和paused mode)、可寫流、可讀寫流,Node.js 提供了多種流對象。 例如, HTTP 請求 和 process.stdout 就都是流的實例。stream 模塊提供了基礎的 API 。使用這些 API 能夠很容易地來構建實現流接口的對象。它們底層都調用了stream模塊並進行封裝。

相關文章
相關標籤/搜索