這裏就要提到一個網上的例子了。首先,咱們須要先生成一個大文件:node
const fs = require('fs');
const file = fs.createWriteStream('./test.txt');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
複製代碼
這就生成了一個大概 450MB 的文件。第二步,咱們須要啓動一個簡單的 Node 服務器。緩存
const fs = require('fs');
const http = require('http');
const server = http.createServer();
server.on('request', (req, res) => {
fs.readFile('./test.txt', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(3000);
複製代碼
把代碼寫好以後,我在本地全局安裝了一個 pm2,固然了,你也可使用系統自帶任務管理器、活動監視器等等來監控內存消耗的變化,pm2 比較直觀好用,我就直接使用了 pm2。服務器
這是初始狀態:curl
咱們打開命令行,使用 curl 來訪問服務器:curl http://localhost:3000
。ide
這時候,咱們能夠經過 pm2 monit index.js
來查看。函數
能夠看到如今的內存消耗爲:學習
能夠看到,用這種方法是將整個文件都放在了內存當中,若是文件小還好,可是當數據量很是大的時候,就會形成內存不夠用的問題,而且當你將內存佔個七七八八也會影響到用戶訪問的速度,針對這種狀況,咱們使用流來解決這個問題。ui
首先,HTTP 響應對象也是一種可寫流,咱們能夠建立一個文件的可讀流,經過 pipe 鏈接這兩個流,咱們能夠看下這樣的內存消耗。this
const fs = require('fs');
const http = require('http');
const server = http.createServer();
server.on('request', (req, res) => {
fs.createReadStream('./test.txt').pipe(res);
});
server.listen(3000);
複製代碼
能夠看到在使用流的狀況下,咱們內存的消耗是十分少的。感覺到流的魅力了麼?一塊兒來學習吧。url
你們應該都記得,小學的時候常常碰到這樣的題:有一個 x 立方米的水池,一個進水管每秒進水 a 立方米,一個出水管每秒出水 b 立方米,問啥時候水池積滿?不知道你們小時候有沒有罵過這道題,一邊進一邊出這不是有病麼?哈哈,不扯遠了,繼續回到咱們的流,這個題的模型其實和咱們的流很類似,固然了,上面這個題還涉及到了流控的問題,下面再說。
在 Node.js 中,有四種基本的流類型:
說可讀流以前,先劃重點:
重點畫出來了咱們開始一點一點扒可讀流,首先,咱們要說的就是 read
和 _read
這兩個函數,正常狀況下,咱們在使用可讀流的時候,須要提供一個 _read
方法,負責向緩衝區補充數據,它要從數據源拉取數據,而後在這個方法中調用 push
方法把數據推到緩衝池中,當流結束時,咱們須要 push(null)
,而 read(size) 函數是咱們要消耗緩衝區的數據的時候使用的方法,不須要咱們本身實現。
下面要說的是可讀流分兩種模式:flowing 和 pause,這兩種模式決定了數據流動的方式。
在 flowing 模式下,數據會源源不斷的產生,每次都會觸發 data
事件 ,經過監聽這個事件來獲取數據。
那麼在什麼狀況下會進入 flowing 模式呢?OK,扒源碼,經過看可讀流的源碼,知道在流的屬性裏有一個 flowing
的屬性,這個屬性初始化的時候爲 null
。這時候是處於 pause
模式,咱們在當前文件全局搜一下 flowing =
發現當咱們調用 resume()
的時候會將這個標誌位設爲 true
,這時候就處於 flowing
模式了,那麼還有沒有其餘的方法呢?答案是確定的,是有的,看源碼:
Readable.prototype.on = function(ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn);
const state = this._readableState;
if(ev === 'data') {
state.readableListening = this.listenerCount('readable') > 0;
if (state.flowing !== false)
this.resume();
} else if ('readable'){
//...
}
return res;
}
複製代碼
咱們能夠關注到,當咱們監聽 data
事件的時候,由於當前初始化標誌位爲 null
,因此會去調用 resume()
,這時候就會進入 flowing 模式,同時,當可讀流調用 pipe
的時候會去監聽 data
事件,也會進入 flowing
模式。
那麼當你監聽 data
事件進入 flowing
模式時,整個代碼流程是什麼呢?
從這張圖,咱們能看出 flowing
模式的一個大概流程,從初始化開始,flowing = null
,而後當咱們監聽 data
事件,會去調用 this.resume()
,這時候就將 flowing
變爲 true
,而後調用了 resume
,在這個函數裏,調用了 read(0)
去觸發 _read()
向緩衝區補充數據,這裏要提一點的是當咱們調用 read(0)
的時候,不會破壞緩衝區的數據和狀態,並觸發 _read
去讀取數據到緩衝區。接下來就是不斷的循環往復,直到 push(null)
則流結束。
如今知道了 flowing
模式,那麼 pause
模式又是怎樣的呢?首先咱們來看如何進入 pause
模式:
pause
模式。pause
方法unpipe
方法通常來講咱們不多會去使用到 pause
模式,在 pause
模式下,咱們須要手動的調用 read()
函數去獲取數據。
這兩個都是關於數據的事件,至於 end
事件,很簡單,就很少說了。
那麼 readable
事件表明了什麼呢?readable
只負責通知消費者流裏有新數據或者流讀完了,至於如何使用則是消費者本身的事情了,這時候 read()
就會返回新數據或者是 null。
至於 data
事件,咱們看一下上面那張圖,這個事件是在流把數據傳遞給消費者的時候觸發的。
那麼咱們同時監聽 data
和 readable
事件會怎麼樣呢?從上面的圖咱們能夠得知,當監聽 data
事件的時候,流直接將數據傳遞給了消費者,並無進入緩衝區,只會觸發 data
事件,而只有當數據消耗完成時 push(null)
會觸發 readable
事件。
可寫流是做爲下流來消耗上游的數據,那麼開始劃重點:
和可讀流同樣,咱們須要在初始化流的時候提供一個 _write()
方法,用來向底層寫數據,而 write()
方法是用來向可寫流提供數據的,注意在 _write
方法中的第三個參數在源碼中是一個叫 onwrite
的方法,這是爲了代表當前寫入數據已經完成了,能夠開始寫入下面的數據了。可寫流的終止信號是調用 end()
方法。
那麼可寫流是如何監聽流結束事件呢?答案是有兩個事件能夠監聽,一個是 prefinish
,另外一個是 finish
。
這兩個事件的區別是,finish
是在流的全部數據都寫入底層而且回調函數都執行了纔會觸發,而 prefinish
的觸發條件是全部的數據都寫入底層,這二者之間仍是有必定差別的。
Duplex 的代碼量很是少,由於它同時繼承了可讀流和可寫流,它同時包含了這兩種流原型上的方法,同時包含了兩種流的屬性。因此咱們既能夠實現 _read
將它當成可讀流也能夠實現 _write
將其當成可寫流來使用。
而 Transform 繼承了 Duplex,而且關聯了兩個緩存池,咱們向流中寫入數據,就可以進行轉換,而後再讀取,那爲何能夠這樣操做呢?
咱們去看看源碼,Transform 本身實現了 _write
和 _read
方法,注意的是這裏使用的是同一個緩存,咱們來看這麼一段代碼。
const { Transform } = require('stream')
var transform = Transform({
transform: function (buf, _, next) {
next(null, buf.toString().replace('a', 'b'))
}
})
// 'b'
transform.pipe(process.stdout)
transform.write('a')
transform.end()
複製代碼
上面的代碼主要流程是這樣的,Transform 調用了繼承自可寫流的 write
方法,而後這個方法調用本身實現的 _write
將寫入的數據存到了 Transform 的緩存中,而後將其轉換成 buffer,在其後 _read
函數被調用,在這個函數中調用了在初始化的時候傳入轉換函數 _transform
對數據進行轉換,在轉換事後就是 readable.pipe(writable)
的模式了。
還有一點是,Transform 還有一個 _flush
函數,在 prefinish
觸發時就會調用它,說明寫流結束了。
在咱們進行可寫流和可讀流的對接的時候咱們要處理各類事件,以及流控的問題,就像咱們在上面提到的那道題,若是讀流速度太快,而寫流速度慢,就會致使速度不匹配的問題,而 pipe
實現了一套背壓平衡機制來控制兩邊的速度。
那關於 pipe 的源碼解析等等能夠去看看這篇文章。
在 Node 裏,流是很是重要的一個模塊,它可以很好的處理大文件,以及對數據的處理能力。此次對流的學習也是收穫了很多東西,與君共勉!