若是你對NodeJs系列感興趣,歡迎關注微信公衆號:前端神盾局或 github NodeJs系列文章html
流從早先的unix初出茅廬,在過去的幾十年的時間裏,它被證實是一種可依賴的編程方式,它能夠將一個大型的系統拆成一些很小的部分,而且讓這些部分之間完美地進行合做。前端
在node中,流的身影幾乎無處不在,不管是操做文件、建立本地服務器仍是簡單的console
,都極有可能涉及到流。node
Node.js 中有四種基本的流類型:git
假設咱們須要使用node來實現一個簡單的靜態文件服務器:github
const http = require('http'); const fs = require('fs'); http.createServer((req,res)=>{ fs.readFile('./test.html',function(err,data){ if(err){ res.statusCode = 500; res.end(); }else{ res.end(data); } }) }).listen(3000)
上述代碼簡單實現了靜態文件的讀取和發送,邏輯上是徹底可行的。可是因爲readFile
是一次性將讀取的文件存放在內存中的,假設test.html
文件很是大或者訪問量增多的狀況下,服務器內存頗有可能耗盡。這時咱們就須要使用流的方式進行改進:編程
const http = require('http'); const fs = require('fs'); http.createServer((req,res)=>{ fs.createReadStream('./test.html').pipe(res); }).listen(3000);
fs.createReadStream
建立一個可讀流,逐次讀取文件內容供給下游消費,這種逐步讀取和消費的方式,有效減緩了內存的消耗。api
咱們能夠把 Readable Stream拆分紅兩個階段:push階段和pull階段,在push階段,經過實現_read
方法將數據從底層數據資源池中推送到緩存池中,這是數據的生產階段,而pull階段,則是將緩存池的數據拉出,供下游使用,這是數據的消費階段。緩存
在開始進一步講解以前,咱們先來介紹幾個字段,這些字段來源於node源碼:服務器
state.buffer
: Array
緩存池,每一個元素對應push(data)中的datastate.length
: Number
緩存池中的數據量,在objectMode
模式下,state.length === state.buffer.length
,不然,其值是state.buffer
中數據字節數的總和state.ended
: Boolean
表示底層數據池沒有可讀數據了(this.pull(null)
)state.flowing
: Null|Boolean
表示當前流的模式,其值有三種狀況:null
(初始狀態)、true
(流動模式)、false
(暫停模式)state.needReadable
: Boolean
是否須要觸發readable
事件state.reading
: Boolean
是否正在讀取底層數據state.sync
: Boolean
是否當即觸發data
/readable
事件,false
爲當即觸發、true
下一個tick再觸發(process.nextTick
)可讀流存在兩種模式:流動模式(flowing)和暫停模式(paused),在源碼中使用state.flowing
來標識。微信
兩種模式其基本流程都遵循上圖中的push和pull階段,區別在於pull階段的自主性。對於流動模式而言,只要緩存池還有未消耗的數據,那麼數據便會不斷的被提取,咱們能夠把它想象成一個自動的水泵,只要通電了,不抽乾水池的水它是不會停下來的。而對於暫停模式,它更像是打水桶,須要的時候再從水池裏面打點水出來。
全部可讀流都開始於暫停模式,能夠經過如下方式切換到流動模式:
data
事件句柄(前提是state.flowing === null
)stream.resume()
stream.pipe()
可讀流也能夠經過如下方式切換回暫停模式:
readable
事件句柄stream.pause()
。stream.unpipe()
能夠移除多個管道目標。read
開始對於可讀流而言,消費驅動生產,只有經過調用pull階段的read
函數,才能喚醒push階段的數據產生,從而帶動整個流的運動。因此對於可讀流而言read
是一切的起點。
這是根據源碼整理的一個簡單的流程圖,後面將對一些環節加以說明。
howMuchToRead
調用read(n)
過程當中,node會根據實際狀況調整讀取的數量,實際值由howMuchRead
決定
function howMuchToRead(n,state){ // 若是size <= 0或者不存在可讀數據 if (n <= 0 || (state.length === 0 && state.ended)) return 0; // objectMode模式下 每次制度一個單位長度的數據 if (state.objectMode) return 1; // 若是size沒有指定 if (Number.isNaN(n)) { // 執行read()時,因爲流動模式下數據會不斷輸出, // 因此每次只輸出緩存中第一個元素輸出,而非流動模式則會將緩存讀空 if (state.flowing && state.length) return state.buffer.head.data.length; else return state.length; } if (n > state.highWaterMark) // 更新highWaterMark state.highWaterMark = computeNewHighWaterMark(n); // 若是緩存中的數據量夠用 if (n <= state.length) return n; // 若是緩存中的數據不夠用, // 且資源池還有可讀取的數據,那麼這一次先不讀取緩存數據 // 留着下一次數據量足夠的時候再讀取 // 不然讀空緩存池 if (!state.ended) { state.needReadable = true; return 0; } return state.length; }
end
事件在read
函數調用過程當中,node會擇機斷定是否觸發end
事件,斷定標準主要是如下兩個條件:
if (state.length === 0 && state.ended) endReadable(this);
state.ended
爲true
,
經過調用
pull(null)
表示底層數據當前已經沒有可讀數據了
state.length === 0
本事件在調用read([size])
時觸發(知足上述條件時)
doRead
doRead
用於判斷是否讀取底層數據
// 若是當前是暫停模式`state.needReadable` var doRead = state.needReadable; // 若是當前緩存池是空的或者沒有足夠的緩存 if (state.length === 0 || state.length - n < state.highWaterMark){ doRead = true; } if (state.ended || state.reading) { doRead = false; } else if (doRead) { // ... this._read(state.highWaterMark); // ... }
state.reading
標誌上次從底層取數據的操做是否已完成,一旦push
方法被調用,就會設置爲false
,表示這次_read()
結束
data
事件在官方文檔中提到:添加data
事件句柄,可使Readable Stream的模式切換到流動模式,但官方沒有提到的是這一結果成立的條件-state.flowing
的值不爲null
,即只有在初始狀態下,監聽data事件,會使流進入流動模式。舉個例子:
const { Readable } = require('stream'); class ExampleReadable extends Readable{ constructor(opt){ super(opt); this._time = 0; } _read(){ this.push(String(++this._time)); } } const exampleReadable = new ExampleReadable(); // 暫停 state.flowing === false exampleReadable.pause(); exampleReadable.on('data',(chunk)=>{ console.log(`Received ${chunk.length} bytes of data.`); });
運行這個例子,咱們發現終端沒有任何輸出,爲何會這樣呢?緣由咱們能夠從源碼中看出端倪
if (state.flowing !== false) this.resume();
由此咱們能夠把官方表述再完善一些:在可讀流初始化狀態下(state.flowing === null
),添加data
事件句柄會使流進入流動模式。
只能被可讀流的實現調用,且只能在 readable._read() 方法中調用。
push是數據生產的核心,消費方經過調用read(n)
促使流輸出數據,而流經過_read()使底層調用push方法將數據傳給流。
在這個過程當中,push方法有可能將數據存放在緩存池內,也有可能直接經過data
事件輸出。下面咱們一一分析。
若是當前流是流動的(state.flowing === true
),且緩存池內沒有可讀數據,
那麼數據將直接由事件data
輸出
// node 源碼 if (state.flowing && state.length === 0 && !state.sync){ state.awaitDrain = 0; stream.emit('data', chunk); }
咱們舉個例子:
const { Readable } = require('stream'); class ExampleReadable extends Readable{ constructor(opt){ super(opt); this.max = 100; this.time = 0; } _read(){ const seed = setTimeout(()=>{ if(this.time > 100){ this.push(null); }else{ this.push(String(++this.time)); } clearTimeout(seed); },0) } } const exampleReadable = new ExampleReadable({ }); exampleReadable.on('data',(data)=>{ console.log('from data',data); });
readable
事件exampleReadable.on('readable',()=>{ .... });
當咱們註冊一個readable
事件後,node就會作如下處理:
state.flowing = false; state.needReadable = true;
readable
,stream.emit('readable');
self.read(0);
state.flow === false
當前處於暫停模式state.length || state.ended
return !state.ended && (state.length < state.highWaterMark || state.length === 0);