若是你正在學習Node,那麼流必定是一個你須要掌握的概念。若是你想成爲一個Node高手,那麼流必定是武功祕籍中不可缺乏的一個部分。
引用自Stream-Handbook。因而可知,流對於深刻學習Node的重要性。javascript
你能夠把流理解成一種傳輸的能力。經過流,能夠以平緩的方式,無反作用的將數據傳輸到目的地。在Node中,Node Stream建立的流都是專用於String和Buffer上的,通常狀況下使用Buffer。Stream表示的是一種傳輸能力,Buffer是傳輸內容的載體 (能夠這樣理解,Stream:外賣小哥哥, Buffer:你的外賣)。建立流的時候將ObjectMode設置true ,Stream一樣能夠傳輸任意類型的JS對象(除了null,null在流中有特殊用途)。html
如今有個需求,咱們要向客戶端傳輸一個大文件。若是採用下面的方式java
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
每次接收一個請求,就要把這個大文件讀入內存,而後再傳輸給客戶端。經過這種方式可能會產生如下三種後果:git
因此這種方式在傳輸大文件的狀況下,不是一個好的方案。併發量一大,幾百個請求過來很容易就將內存耗盡。github
若是採用流呢?服務器
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
採用這種方式,不會佔用太多內存,讀取一點就傳輸一點,整個過程平緩進行,很是優雅。若是想在傳輸的過程當中,想對文件進行處理,好比壓縮、加密等等,也很好擴展(後面會具體介紹)。併發
流在Node中無處不在。從下圖中能夠看出:異步
Stream分爲四大類:函數
可讀流中的數據,在如下兩種模式下都能產生數據。學習
兩種模式下,觸發的方式以及消耗的方式不同。
Flowing Mode:數據會源源不斷地生產出來,造成「流動」現象。監聽流的data
事件即可進入該模式。
Non-Flowing Mode下:須要顯示地調用read()
方法,才能獲取數據。
兩種模式能夠互相轉換
流的初始狀態是Null,經過監聽data
事件,或者pipe
方法,調用resume
方法,將流轉爲Flowing Mode
狀態。Flowing Mode
狀態下調用pause
方法,將流置爲Non-Flowing Mode
狀態。Non-Flowing Mode
狀態下調用resume
方法,一樣能夠將流置爲Flowing Mode
狀態。
下面詳細介紹下兩種模式下,Readable流的運行機制。
在Flowing Mode狀態下,建立的myReadable讀流,直接監聽data事件,數據就源源不斷的流出來進行消費了。
myReadable.on('data',function(chunk){ consume(chunk);//消費流 })
一旦監聽data事件以後,Readable內部的流程以下圖所示
核心的方法是流內部的read方法,它在參數n爲不一樣值時,分別觸發不一樣的操做。下面描述中的hightwatermark表示的是流內部的緩衝池的大小。
圖中黃色標識的_read(),是用戶實現流所須要本身實現的方法,這個方法就是實際讀取流的方式(能夠這樣理解,外賣平臺給你提供外賣的能力,那_read()方法就至關於你下單點外賣)。後面會詳細介紹如何實現_read方法。
以上的流程能夠描述爲:監聽data方法,Readable內部就會調用read方法,來進行觸發讀流操做,經過判斷是同步仍是異步讀取,來決定讀取的數據是否放入緩衝區。若是爲異步的,那麼就要調用flow方法,來繼續觸發read方法,來讀取流,同時根據size參數斷定是否emit('data')來消費流,循環讀取。若是是同步的,那就emit('data')來消費流,同時繼續觸發read方法,來讀取流。一旦push方法傳入的是null,整個流就結束了。
從使用者的角度來看,在這種模式下,你能夠經過下面的方式來使用流
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk){ writeFile1.write(chunk); })
相對於Flowing mode,Non-Flowing Mode要相對簡單不少。
消費該模式下的流,須要使用下面的方式
myReadable.on(‘readable’,function(){ const chunk = myReadable.read() consume(chunk);//消費流 })
在Non-Flowing Mode下,Readable內部的流程以下圖:
從這個圖上看出,你要實現該模式的讀流,一樣要實現一個_read方法。
整個流程以下:監聽readable方法,Readable內部就會調用read方法。調用用戶實現的_read方法,來push數據到緩衝池,而後發送emit readable事件,通知用戶端消費。
從使用者的角度來看,你能夠經過下面的方式來使用該模式下的流
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('readable',function(chunk) { while (null !== (chunk = myReadable.read())) { writeFile.write(chunk); } });
相對於讀流,寫流的機制就更容易理解了。
寫流使用下面的方式進行數據寫入
myWrite.write(chunk);
調用write後,內部Writable的流程以下圖所示
相似於讀流,實現一個寫流,一樣須要用戶實現一個_write方法。
整個流程是這樣的:調用write以後,會首先斷定是否要寫入緩衝區。若是不須要,那就調用用戶實現的_write方法,將流寫入到相應的地方,_write會調用一個writeable內部的一個回調函數。
從使用者的角度來看,使用一個寫流,採用下面的代碼所示的方式。
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk) { writeFile.write(chunk); })
能夠看到,使用寫流是很是簡單的。
咱們先講解一下如何實現一個讀流和寫流,再來看Duplex和Transform是什麼,由於瞭解瞭如何實現一個讀流和寫流,再來理解Duplex和Transform就很是簡單了。
實現自定義的Readable,只須要實現一個_read方法便可,須要在_read方法中調用push方法來實現數據的生產。以下面的代碼所示:
const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }); } } // 模擬資源池 const dataSource = { data: new Array(10).fill('-'), makeData() { if (!dataSource.data.length) return null; return dataSource.data.pop(); } }; const myReadable = new MyReadable(dataSource,); myReadable.on('readable', () => { let chunk; while (null !== (chunk = myReadable.read())) { console.log(chunk); } });
實現自定義的writable,只須要實現一個_write方法便可。在_write中消費chunk寫入到相應地方,而且調用callback回調。以下面代碼所示:
const Writable = require('stream').Writable; class Mywritable extends Writable{ constuctor(options){ super(options); } _write(chunk,endcoding,callback){ console.log(chunk); callback && callback(); } } const myWritable = new Mywritable();
雙工流:簡單理解,就是講一個Readable流和一個Writable流綁定到一塊兒,它既能夠用來作讀流,又能夠用來作寫流。
實現一個Duplex流,你須要同時實現_read和_write方法。
有一點須要注意的是:它所包含的 Readable流和Writable流是徹底獨立,互不影響的兩個流,兩個流使用的不是同一個緩衝區。經過下面的代碼能夠驗證
// 模擬資源池1 const dataSource1 = { data: new Array(10).fill('a'), makeData() { if (!dataSource1.data.length) return null; return dataSource1.data.pop(); } }; // 模擬資源池2 const dataSource2 = { data: new Array(10).fill('b'), makeData() { if (!dataSource2.data.length) return null; return dataSource2.data.pop(); } }; const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } } const Writable = require('stream').Writable; class MyWritable extends Writable{ constructor(options){ super(options); } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const Duplex = require('stream').Duplex; class MyDuplex extends Duplex{ constructor(dataSource,options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const myWritable = new MyWritable(); const myReadable = new MyReadable(dataSource1); const myDuplex = new MyDuplex(dataSource1); myReadable.pipe(myDuplex).pipe(myWritable);
打印的結果是
abababababababababab
從這個結果能夠看出,myReadable.pipe(myDuplex)
,myDuplex充當的是寫流,寫入的內容是a;myDuplex.pipe(myWritable)
,myDuplex充當的是讀流,往myWritable寫的倒是b;因此說它所包含的 Readable流和Writable流是徹底獨立的。
理解了Duplex,就更好理解Transform了。Transform是一個轉換流,它既有讀的功能又有寫的功能,可是它和Duplex不一樣的是,它的讀流和寫流共用同一個緩衝區;也就是說,經過它讀入什麼,那它就能寫入什麼。
實現一個Transform,你只須要實現一個_transform方法。好比最簡單的Transform:PassThrough,其源代碼以下所示
PassThrough就是一個Transform,可是這個轉換流,什麼也沒作,至關於一個透明的轉換流。能夠看到_transform中什麼都沒有,只是簡單的將數據進行回調。
若是咱們在這個環節作些擴展,只須要在_transform中直接擴展就好了。好比咱們能夠對流進行壓縮,加密,混淆等等操做。
最後介紹一個流中很是重要的一個概念:背壓。要了解這個,咱們首先來看下pipe和highWaterMaker是什麼。
首先看下下面的代碼
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.pipe(writeFile);
上面的代碼和下面是等價的
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(data){ var flag = ws.write(data); if(!flag){ // 當前寫流緩衝區已滿,暫停讀數據 readFile.pause(); } }) writeFile.on('drain',function()){ readFile.resume();// 當前寫流緩衝區已清空,從新開始讀流 } readFile.on('end',function(data){ writeFile.end();//將寫流緩衝區的數據所有寫入,而且關閉寫入的文件 })
pipe所作的操做就是至關於爲寫流和讀流自動作了速度的匹配。
讀寫流速度不匹配的狀況下,通常狀況下不會形成什麼問題,可是會形成內存增長。內存消耗增長,就有可能會帶來一系列的問題。因此在使用的流的時候,強烈推薦使用pipe。
highWaterMaker說白了,就是定義緩衝區的大小。
背壓的概念能夠理解爲:爲了防止讀寫流速度不匹配而產生的一種調整機制;背壓該調整機制的觸發時機,受限於highWaterMaker設置的大小。
如上面的代碼 var flag = ws.write(data);
,一旦寫流的緩衝區滿了,那flag
就會置爲false,反向促進讀流的速度調整。
主要有如下場景
1.文件操做(複製,壓縮,解壓,加密等)
下面的就很容易就實現了文件複製的功能。
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big_copy.file'); readFile.pipe(writeFile);
那咱們想在複製的過程當中對文件進行壓縮呢?
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big.gz'); const zlib = require('zlib'); readFile.pipe(zlib.createGzip()).pipe(writeFile);
實現解壓、加密也是相似的。
2.靜態文件服務器
好比須要返回一個html,可使用以下代碼。
var http = require('http'); var fs = require('fs'); http.createServer(function(req,res){ fs.createReadStream('./a.html').pipe(res); }).listen(8000);