若是你對NodeJs系列感興趣,歡迎關注微信公衆號:前端神盾局或 github NodeJs系列文章html
流從早先的unix初出茅廬,在過去的幾十年的時間裏,它被證實是一種可依賴的編程方式,它能夠將一個大型的系統拆成一些很小的部分,而且讓這些部分之間完美地進行合做。前端
在node中,流的身影幾乎無處不在,不管是操做文件、建立本地服務器仍是簡單的console
,都極有可能涉及到流。node
Node.js 中有四種基本的流類型:git
假設咱們須要使用node來實現一個簡單的靜態文件服務器:github
const http = require('http');
const fs = require('fs');
http.createServer((req,res)=>{
fs.readFile('./test.html',function(err,data){
if(err){
res.statusCode = 500;
res.end();
}else{
res.end(data);
}
})
}).listen(3000)
複製代碼
上述代碼簡單實現了靜態文件的讀取和發送,邏輯上是徹底可行的。可是因爲readFile
是一次性將讀取的文件存放在內存中的,假設test.html
文件很是大或者訪問量增多的狀況下,服務器內存頗有可能耗盡。這時咱們就須要使用流的方式進行改進:編程
const http = require('http');
const fs = require('fs');
http.createServer((req,res)=>{
fs.createReadStream('./test.html').pipe(res);
}).listen(3000);
複製代碼
fs.createReadStream
建立一個可讀流,逐次讀取文件內容供給下游消費,這種逐步讀取和消費的方式,有效減緩了內存的消耗。api
咱們能夠把 Readable Stream拆分紅兩個階段:push階段和pull階段,在push階段,經過實現_read
方法將數據從底層數據資源池中推送到緩存池中,這是數據的生產階段,而pull階段,則是將緩存池的數據拉出,供下游使用,這是數據的消費階段。緩存
在開始進一步講解以前,咱們先來介紹幾個字段,這些字段來源於node源碼:bash
state.buffer
: Array
緩存池,每一個元素對應push(data)中的datastate.length
: Number
緩存池中的數據量,在objectMode
模式下,state.length === state.buffer.length
,不然,其值是state.buffer
中數據字節數的總和state.ended
: Boolean
表示底層數據池沒有可讀數據了(this.pull(null)
)state.flowing
: Null|Boolean
表示當前流的模式,其值有三種狀況:null
(初始狀態)、true
(流動模式)、false
(暫停模式)state.needReadable
: Boolean
是否須要觸發readable
事件state.reading
: Boolean
是否正在讀取底層數據state.sync
: Boolean
是否當即觸發data
/readable
事件,false
爲當即觸發、true
下一個tick再觸發(process.nextTick
)可讀流存在兩種模式:流動模式(flowing)和暫停模式(paused),在源碼中使用state.flowing
來標識。服務器
兩種模式其基本流程都遵循上圖中的push和pull階段,區別在於pull階段的自主性。對於流動模式而言,只要緩存池還有未消耗的數據,那麼數據便會不斷的被提取,咱們能夠把它想象成一個自動的水泵,只要通電了,不抽乾水池的水它是不會停下來的。而對於暫停模式,它更像是打水桶,須要的時候再從水池裏面打點水出來。
全部可讀流都開始於暫停模式,能夠經過如下方式切換到流動模式:
data
事件句柄(前提是state.flowing === null
)stream.resume()
stream.pipe()
可讀流也能夠經過如下方式切換回暫停模式:
readable
事件句柄stream.pause()
。stream.unpipe()
能夠移除多個管道目標。read
開始對於可讀流而言,消費驅動生產,只有經過調用pull階段的read
函數,才能喚醒push階段的數據產生,從而帶動整個流的運動。因此對於可讀流而言read
是一切的起點。
這是根據源碼整理的一個簡單的流程圖,後面將對一些環節加以說明。
howMuchToRead
調用read(n)
過程當中,node會根據實際狀況調整讀取的數量,實際值由howMuchRead
決定
function howMuchToRead(n,state){
// 若是size <= 0或者不存在可讀數據
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
// objectMode模式下 每次制度一個單位長度的數據
if (state.objectMode)
return 1;
// 若是size沒有指定
if (Number.isNaN(n)) {
// 執行read()時,因爲流動模式下數據會不斷輸出,
// 因此每次只輸出緩存中第一個元素輸出,而非流動模式則會將緩存讀空
if (state.flowing && state.length)
return state.buffer.head.data.length;
else
return state.length;
}
if (n > state.highWaterMark)
// 更新highWaterMark
state.highWaterMark = computeNewHighWaterMark(n);
// 若是緩存中的數據量夠用
if (n <= state.length)
return n;
// 若是緩存中的數據不夠用,
// 且資源池還有可讀取的數據,那麼這一次先不讀取緩存數據
// 留着下一次數據量足夠的時候再讀取
// 不然讀空緩存池
if (!state.ended) {
state.needReadable = true;
return 0;
}
return state.length;
}
複製代碼
end
事件在read
函數調用過程當中,node會擇機斷定是否觸發end
事件,斷定標準主要是如下兩個條件:
if (state.length === 0 && state.ended) endReadable(this);
複製代碼
state.ended
爲true
,經過調用
pull(null)
表示底層數據當前已經沒有可讀數據了
state.length === 0
本事件在調用read([size])
時觸發(知足上述條件時)
doRead
doRead
用於判斷是否讀取底層數據
// 若是當前是暫停模式`state.needReadable`
var doRead = state.needReadable;
// 若是當前緩存池是空的或者沒有足夠的緩存
if (state.length === 0 || state.length - n < state.highWaterMark){
doRead = true;
}
if (state.ended || state.reading) {
doRead = false;
} else if (doRead) {
// ...
this._read(state.highWaterMark);
// ...
}
複製代碼
state.reading
標誌上次從底層取數據的操做是否已完成,一旦push
方法被調用,就會設置爲false
,表示這次_read()
結束
data
事件在官方文檔中提到:添加data
事件句柄,可使Readable Stream的模式切換到流動模式,但官方沒有提到的是這一結果成立的條件-state.flowing
的值不爲null
,即只有在初始狀態下,監聽data事件,會使流進入流動模式。舉個例子:
const { Readable } = require('stream');
class ExampleReadable extends Readable{
constructor(opt){
super(opt);
this._time = 0;
}
_read(){
this.push(String(++this._time));
}
}
const exampleReadable = new ExampleReadable();
// 暫停 state.flowing === false
exampleReadable.pause();
exampleReadable.on('data',(chunk)=>{
console.log(`Received ${chunk.length} bytes of data.`);
});
複製代碼
運行這個例子,咱們發現終端沒有任何輸出,爲何會這樣呢?緣由咱們能夠從源碼中看出端倪
if (state.flowing !== false)
this.resume();
複製代碼
由此咱們能夠把官方表述再完善一些:在可讀流初始化狀態下(state.flowing === null
),添加data
事件句柄會使流進入流動模式。
只能被可讀流的實現調用,且只能在 readable._read() 方法中調用。
push是數據生產的核心,消費方經過調用read(n)
促使流輸出數據,而流經過_read()使底層調用push方法將數據傳給流。
在這個過程當中,push方法有可能將數據存放在緩存池內,也有可能直接經過data
事件輸出。下面咱們一一分析。
若是當前流是流動的(state.flowing === true
),且緩存池內沒有可讀數據, 那麼數據將直接由事件data
輸出
// node 源碼
if (state.flowing && state.length === 0 && !state.sync){
state.awaitDrain = 0;
stream.emit('data', chunk);
}
複製代碼
咱們舉個例子:
const { Readable } = require('stream');
class ExampleReadable extends Readable{
constructor(opt){
super(opt);
this.max = 100;
this.time = 0;
}
_read(){
const seed = setTimeout(()=>{
if(this.time > 100){
this.push(null);
}else{
this.push(String(++this.time));
}
clearTimeout(seed);
},0)
}
}
const exampleReadable = new ExampleReadable({ });
exampleReadable.on('data',(data)=>{
console.log('from data',data);
});
複製代碼
readable
事件exampleReadable.on('readable',()=>{
....
});
複製代碼
當咱們註冊一個readable
事件後,node就會作如下處理:
state.flowing = false;
state.needReadable = true;
複製代碼
readable
,stream.emit('readable');
複製代碼
self.read(0);
state.flow === false
當前處於暫停模式state.length || state.ended
return !state.ended &&
(state.length < state.highWaterMark || state.length === 0);
複製代碼