Node Stream運行機制

若是你正在學習Node,那麼流必定是一個你須要掌握的概念。若是你想成爲一個Node高手,那麼流必定是武功祕籍中不可缺乏的一個部分。

引用自Stream-Handbook。因而可知,流對於深刻學習Node的重要性。javascript

流是什麼?

你能夠把流理解成一種傳輸的能力。經過流,能夠以平緩的方式,無反作用的將數據傳輸到目的地。在Node中,Node Stream建立的流都是專用於String和Buffer上的,通常狀況下使用Buffer。Stream表示的是一種傳輸能力,Buffer是傳輸內容的載體 (能夠這樣理解,Stream:外賣小哥哥, Buffer:你的外賣)。建立流的時候將ObjectMode設置true ,Stream一樣能夠傳輸任意類型的JS對象(除了null,null在流中有特殊用途)。html

爲何要使用流?

如今有個需求,咱們要向客戶端傳輸一個大文件。若是採用下面的方式java

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
    res.end(data);
  });
});

server.listen(8000);

每次接收一個請求,就要把這個大文件讀入內存,而後再傳輸給客戶端。經過這種方式可能會產生如下三種後果:git

  • 內存耗盡
  • 拖慢其餘進程
  • 增長垃圾回收器的負載

因此這種方式在傳輸大文件的狀況下,不是一個好的方案。併發量一大,幾百個請求過來很容易就將內存耗盡。github

若是採用流呢?服務器

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

採用這種方式,不會佔用太多內存,讀取一點就傳輸一點,整個過程平緩進行,很是優雅。若是想在傳輸的過程當中,想對文件進行處理,好比壓縮、加密等等,也很好擴展(後面會具體介紹)。併發

流在Node中無處不在。從下圖中能夠看出:異步

20181008153896256368933png

Stream分類

Stream分爲四大類:函數

  • Readable(可讀流)
  • Writable (可寫流)
  • Duplex (雙工流)
  • Transform (轉換流)

Readable

可讀流中的數據,在如下兩種模式下都能產生數據。學習

  • Flowing Mode
  • Non-Flowing Mode

兩種模式下,觸發的方式以及消耗的方式不同。

Flowing Mode:數據會源源不斷地生產出來,造成「流動」現象。監聽流的data事件即可進入該模式。

Non-Flowing Mode下:須要顯示地調用read()方法,才能獲取數據。

兩種模式能夠互相轉換

20181008153896304252946png

流的初始狀態是Null,經過監聽data事件,或者pipe方法,調用resume方法,將流轉爲Flowing Mode狀態。Flowing Mode狀態下調用pause方法,將流置爲Non-Flowing Mode狀態。Non-Flowing Mode狀態下調用resume方法,一樣能夠將流置爲Flowing Mode狀態。

下面詳細介紹下兩種模式下,Readable流的運行機制。

Flowing Mode

在Flowing Mode狀態下,建立的myReadable讀流,直接監聽data事件,數據就源源不斷的流出來進行消費了。

myReadable.on('data',function(chunk){
      consume(chunk);//消費流
})

一旦監聽data事件以後,Readable內部的流程以下圖所示

20181009153904731891066png

核心的方法是流內部的read方法,它在參數n爲不一樣值時,分別觸發不一樣的操做。下面描述中的hightwatermark表示的是流內部的緩衝池的大小。

  • n=undefined(消費數據,並觸發一次可讀流)
  • n=0(觸發一次可讀流,可是不會消費)
  • n>hightwatermark(修改hightwatermark的值)
  • n<buffer的總數據數(直接返回n個字節的數據)
  • n>buffer (能夠返回null,也能夠返回buffer全部的數據(當時最後一次讀取))

圖中黃色標識的_read(),是用戶實現流所須要本身實現的方法,這個方法就是實際讀取流的方式(能夠這樣理解,外賣平臺給你提供外賣的能力,那_read()方法就至關於你下單點外賣)。後面會詳細介紹如何實現_read方法。

以上的流程能夠描述爲:監聽data方法,Readable內部就會調用read方法,來進行觸發讀流操做,經過判斷是同步仍是異步讀取,來決定讀取的數據是否放入緩衝區。若是爲異步的,那麼就要調用flow方法,來繼續觸發read方法,來讀取流,同時根據size參數斷定是否emit('data')來消費流,循環讀取。若是是同步的,那就emit('data')來消費流,同時繼續觸發read方法,來讀取流。一旦push方法傳入的是null,整個流就結束了。

從使用者的角度來看,在這種模式下,你能夠經過下面的方式來使用流

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(chunk){
      writeFile1.write(chunk);
})
Non-Flowing Mode

相對於Flowing mode,Non-Flowing Mode要相對簡單不少。

消費該模式下的流,須要使用下面的方式

myReadable.on(‘readable’,function(){
     const chunk = myReadable.read()
     consume(chunk);//消費流
})

在Non-Flowing Mode下,Readable內部的流程以下圖:

20181010153915295064626.png

從這個圖上看出,你要實現該模式的讀流,一樣要實現一個_read方法。

整個流程以下:監聽readable方法,Readable內部就會調用read方法。調用用戶實現的_read方法,來push數據到緩衝池,而後發送emit readable事件,通知用戶端消費。

從使用者的角度來看,你能夠經過下面的方式來使用該模式下的流

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('readable',function(chunk) {
    while (null !== (chunk = myReadable.read())) {
        writeFile.write(chunk);
    }
});

Writable

相對於讀流,寫流的機制就更容易理解了。

寫流使用下面的方式進行數據寫入

myWrite.write(chunk);

調用write後,內部Writable的流程以下圖所示

20181009153904892352425png

相似於讀流,實現一個寫流,一樣須要用戶實現一個_write方法。

整個流程是這樣的:調用write以後,會首先斷定是否要寫入緩衝區。若是不須要,那就調用用戶實現的_write方法,將流寫入到相應的地方,_write會調用一個writeable內部的一個回調函數。

從使用者的角度來看,使用一個寫流,採用下面的代碼所示的方式。

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('data',function(chunk) {
    writeFile.write(chunk);
})

能夠看到,使用寫流是很是簡單的。

咱們先講解一下如何實現一個讀流和寫流,再來看Duplex和Transform是什麼,由於瞭解瞭如何實現一個讀流和寫流,再來理解Duplex和Transform就很是簡單了。

實現自定義的Readable

實現自定義的Readable,只須要實現一個_read方法便可,須要在_read方法中調用push方法來實現數據的生產。以下面的代碼所示:

const Readable = require('stream').Readable;

class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        });
    }
}

// 模擬資源池
const dataSource = {
    data: new Array(10).fill('-'),
    makeData() {
        if (!dataSource.data.length) return null;
        return dataSource.data.pop();
    }
};

const myReadable = new MyReadable(dataSource,);

myReadable.on('readable', () => {
    let chunk;
    while (null !== (chunk = myReadable.read())) {
        console.log(chunk);
    }
});

實現自定義的writable

實現自定義的writable,只須要實現一個_write方法便可。在_write中消費chunk寫入到相應地方,而且調用callback回調。以下面代碼所示:

const Writable = require('stream').Writable;
class Mywritable extends  Writable{
    constuctor(options){
        super(options);
    }
    _write(chunk,endcoding,callback){
        console.log(chunk);
        callback && callback();
    }
}
const myWritable = new Mywritable();

Duplex

雙工流:簡單理解,就是講一個Readable流和一個Writable流綁定到一塊兒,它既能夠用來作讀流,又能夠用來作寫流。

實現一個Duplex流,你須要同時實現_read_write方法。

有一點須要注意的是:它所包含的 Readable流和Writable流是徹底獨立,互不影響的兩個流,兩個流使用的不是同一個緩衝區。經過下面的代碼能夠驗證

// 模擬資源池1
const dataSource1 = {
    data: new Array(10).fill('a'),
    makeData() {
        if (!dataSource1.data.length) return null;
        return dataSource1.data.pop();
    }
};
// 模擬資源池2
const dataSource2 = {
    data: new Array(10).fill('b'),
    makeData() {
        if (!dataSource2.data.length) return null;
        return dataSource2.data.pop();
    }
};

const Readable = require('stream').Readable;
class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })

    }
}

const Writable = require('stream').Writable;
class MyWritable extends Writable{
    constructor(options){
        super(options);
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const Duplex = require('stream').Duplex;
class MyDuplex extends Duplex{
    constructor(dataSource,options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const myWritable = new MyWritable();
const myReadable = new MyReadable(dataSource1);
const myDuplex = new MyDuplex(dataSource1);
myReadable.pipe(myDuplex).pipe(myWritable);

打印的結果是

abababababababababab

從這個結果能夠看出,myReadable.pipe(myDuplex),myDuplex充當的是寫流,寫入的內容是a;myDuplex.pipe(myWritable),myDuplex充當的是讀流,往myWritable寫的倒是b;因此說它所包含的 Readable流和Writable流是徹底獨立的。

Transform

理解了Duplex,就更好理解Transform了。Transform是一個轉換流,它既有讀的功能又有寫的功能,可是它和Duplex不一樣的是,它的讀流和寫流共用同一個緩衝區;也就是說,經過它讀入什麼,那它就能寫入什麼。

實現一個Transform,你只須要實現一個_transform方法。好比最簡單的Transform:PassThrough,其源代碼以下所示

20181009153905044977496png

PassThrough就是一個Transform,可是這個轉換流,什麼也沒作,至關於一個透明的轉換流。能夠看到_transform中什麼都沒有,只是簡單的將數據進行回調。

若是咱們在這個環節作些擴展,只須要在_transform中直接擴展就好了。好比咱們能夠對流進行壓縮,加密,混淆等等操做。

BackPress

最後介紹一個流中很是重要的一個概念:背壓。要了解這個,咱們首先來看下pipehighWaterMaker是什麼。

pipe

首先看下下面的代碼

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.pipe(writeFile);

上面的代碼和下面是等價的

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(data){
    var flag = ws.write(data);
    if(!flag){ // 當前寫流緩衝區已滿,暫停讀數據
        readFile.pause();
    }
})
writeFile.on('drain',function()){
    readFile.resume();// 當前寫流緩衝區已清空,從新開始讀流
}
readFile.on('end',function(data){
    writeFile.end();//將寫流緩衝區的數據所有寫入,而且關閉寫入的文件
})

pipe所作的操做就是至關於爲寫流和讀流自動作了速度的匹配。

讀寫流速度不匹配的狀況下,通常狀況下不會形成什麼問題,可是會形成內存增長。內存消耗增長,就有可能會帶來一系列的問題。因此在使用的流的時候,強烈推薦使用pipe

highWaterMaker

highWaterMaker說白了,就是定義緩衝區的大小。

  • 默認16Kb(Readable最大8M)
  • 能夠自定義

背壓的概念能夠理解爲:爲了防止讀寫流速度不匹配而產生的一種調整機制;背壓該調整機制的觸發時機,受限於highWaterMaker設置的大小。

如上面的代碼 var flag = ws.write(data);,一旦寫流的緩衝區滿了,那flag就會置爲false,反向促進讀流的速度調整。

Stream的應用場景

主要有如下場景

1.文件操做(複製,壓縮,解壓,加密等)

下面的就很容易就實現了文件複製的功能。

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big_copy.file');
readFile.pipe(writeFile);

那咱們想在複製的過程當中對文件進行壓縮呢?

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big.gz');
const zlib = require('zlib');
readFile.pipe(zlib.createGzip()).pipe(writeFile);

實現解壓、加密也是相似的。

2.靜態文件服務器

好比須要返回一個html,可使用以下代碼。

var http = require('http');
var fs = require('fs');
http.createServer(function(req,res){
    fs.createReadStream('./a.html').pipe(res);
}).listen(8000);
相關文章
相關標籤/搜索