系列一和你們分享了 Node.js 可寫流和可讀流的基本使用。系列二將深刻講解在 Node.js 中可讀流的流動模式(flowing)與暫停模式(paused)。javascript
其實流動模式和暫停模式,對應的是,推模型與拉模型。java
筆者在介紹 Node.js 流的這兩種模式以前,這裏也想展開聊聊咱們已經認識的一些「推拉模型」(或者見過,卻沒有意識到的)。編程
「拉」表明消費者主動去向生產者拉取數據。生產者當被請求時被動的產生數據,而消費者主動的決定什麼時候請求數據。瀏覽器
從實際出發,能夠從這幾個角度理解。咱們在開發中若是存在有生產者、消費者兩種對象。當觀察者主動從生產者中拉取數據的時候,其實就是符合「拉」模型的理念。以下圖:緩存
如下,我將列舉幾種「拉」模型。安全
你們很熟悉的函數,當運行一個函數的時候,就是在獲取一個值。若是你思惟轉換一下,就是「拉」模型!是否是很吃驚!好比以下函數:bash
function createID() {
return Math.random().toString(36).substr(2, 9);;
}
const id = createID(); // adb8d7xjm
複製代碼
此時函數自己至關於生產者,而執行函數的程序就至關於消費者。當程序(即消費者)運行這個函數的時候,即主動去向此函數要一個返回值,函數(生產者)被執行,所以返回了一個 ID 值給它。框架
補充,「拉」通常也意味着「阻塞」。固然,在單線程 JavaScript 場景下,這個「阻塞」的意思也很好理解。dom
在 ES6 中出現的 Generator 生成器也符合「拉」模型。但和 Function 不一樣的是,Function 只是拉取一個值(由於只有一個 return),而 Generator 是拉取多個值(甚至無限的值)異步
function* makeRangeIterator(start = 0, end = Infinity, step = 1) {
for (let i = start; i < end; i += step) {
yield i;
}
}
const it = makeRangeIterator(1, 5, 2);
it.next(); // {value: 1, done: false}
it.next(); // {value: 3, done: false}
it.next(); // {value: undefined, done: true}
複製代碼
在上述示例代碼中,執行生成器函數後,建立了一個遍歷器對象(it 即爲消費者),因爲生成器對象自己是個狀態機,在此場景下,其實爲生產者。此時,每調用一個 next() 去拉值後,生成器對象就會返回一個值。如圖:
固然 Generator 不只僅能夠「拉」值,也能夠「推」值。這也是它之因此如此強大而複雜的地方。爲了向你們簡單示意「推」值,筆者寫了一個略有點智障的生成器函數以下:
function* getCudeSize() {
const width = 5;
const depth = 10;
const height = yield width;
return width * height * depth;
}
const getHeightByWidth = w => w * 1.6;
const it = getCudeSize();
const { value: width } = it.next();
const { value: size } = it.next(getHeightByWidth(width));
複製代碼
示意圖以下:
「推」表明生產者主動去產生數據推送給消費者。生產者主動的按本身的節奏產生數據,消費者被動的對收到的數據作出反應。
經典的推模型莫過於 DOM 事件了。DOM 中存在一大堆事件。好比鼠標事件、鍵盤事件、還有瀏覽器事件等等。就拿咱們最熟悉的 click 事件來看。
document.addEventListener('click', event => {
const { x, y } = event;
console.log('Cursor coordinate: ', x, y);
});
複製代碼
咱們註冊了一個事件偵聽器在 Document 對象的 click 事件上。因而當咱們每點擊一次 Document 的時候,瀏覽器就會調用執行咱們指定的該事件偵聽器,經過 event 參數發給咱們事件描述信息。
一樣做爲補充,「推」通常也意味着等待。訂閱事件但並不會阻塞主線程,而是等待事件發生,觸發自動執行對應的回調函數從而消費數據。
那麼 EventEmiter 也是同樣,相信你們很瞭解了。就是經典的發佈訂閱模式,或也稱爲觀察者模式。筆者認爲大體原理和上述 DOM Events,十分相似。所以略過。
上述的 DOM Events 仍是 EventEmitter,能夠推送多個值。Promise 首先也符合推模型,可是它只能推送一個值。
Promise.resolve('Hi').then(value => {
console.log(value);
});
複製代碼
只有 Promise 自己決定 resolve 一個值的時候,纔會經過 then 推送這個值給到消費者函數。如圖:
社區中的 RxJS 庫裏有大量的相關拉取、推送概念。有興趣的同窗能夠自行去了解哦~
其實在咱們的平常開發中,還有不少不少符合「推」模型的對象。好比還有 SSE(Server-sent Events)、setInterval、XMLHttpRequest、Service Workers、Websocket 等等。這裏咱們只挑幾個進行介紹。其餘的同窗們可自行對照理解。
固然,系列二到此爲止花了不小的篇幅講述「推」與「拉」,看似與 Node.js 流絕不相關,因此可能有些同窗會有點迷惑。然而我相信是頗有幫助的,由於只有理解了這種生產者、消費者機制,才能觸類旁通更好的理解 Node.js 中流的兩種模式,由於筆者認爲,兩者理念也相差不遠,只不過具體表現方式、API 使用方式上存在差別。固然,可能長此以往,你也會發現,這種推拉的思考學習說不定也幫助了理解在 JavaScript 中的異步編程思惟。
在暫停模型下的流,符合「拉」模型的大致框架。
全部的可讀流都開始於暫停模式, 在暫停模式中,必須顯式調用 stream.read() 讀取數據。
根據咱們上面積累的對「拉」模型的理解,暫停模式下,可讀流是生產者,而程序自己是消費者。此時,流當被請求時被動的產生數據,而程序主動的決定什麼時候請求數據。
假設咱們讀取這個文件(what-is-a-stream.txt),它有 150 個字節。
A stream is an abstract interface for working with streaming data in Node.js. The stream module provides an API for implementing the stream interface.
複製代碼
讓咱們在流的暫停模式下,讀取此文件內容。
const stream = fs.createReadStream(files["what-is-a-stream"], {
highWaterMark: 50
});
stream.on("readable", () => {
console.log('stream is readable!');
let data;
while (null !== (data = stream.read())) {
console.log("Received:", data.toString());
}
});
複製代碼
這裏咱們使用了 'readable' 事件,當有數據可從流中讀取時,就會觸發 'readable' 事件。須要注意的是,爲了讓這個示例更加明顯,筆者在 createReadStream 中第二個參數傳入了 highWaterMark 選項爲 50 用以設置可讀緩衝區大小。對於普通的流, highWaterMark 指定了字節的總數。 對於對象模式的流, highWaterMark 指定了對象的總數。所以,在上述示例可讀緩衝區爲 50 個字節,150 字節大小的文件,會讀取三次。
所以,觸發了四次 'readable' 事件。(這裏比預想中多觸發一次,是由於當到達流數據的盡頭時, 'readable' 事件也會觸發,可是在 'end' 事件以前觸發。)
stream is readable!
Received: A stream is an abstract interface for working with
stream is readable!
Received: streaming data in Node.js. The stream module prov
stream is readable!
Received: ides an API for implementing the stream interface.
stream is readable!
複製代碼
將上面的實例代碼用圖解表達出來,就是以下圖這樣:
惟一不一樣的是,咱們的程序訂閱了生產者「流」的 'readable' 事件,這個至關於流主動推送了「我還有數據可讀呀,快來消費我呀」的信號。此時程序就能夠決定任意時機去消費流緩存區中數據。
好比說,你也徹底能夠定時器去讀,同樣也能夠讀取出來!(可是千萬不要這麼作)
setInterval(() => {
let data;
while (null !== (data = stream.read())) {
console.log("Received:", data.toString());
}
}, 30);
複製代碼
咱們在系列一知道,可讀流繼承自 EventEmitter。所以在流動模式中,數據自動從底層系統讀取,並經過 EventEmitter 接口的事件儘量快地被提供給應用程序。
只用對流監聽 'data' 事件,流就會切到流動模式,源源不斷髮送數據塊給程序。
此時的流做爲生產者,擁有了主動推送數據的權力,而咱們的程序,或者說是事件監聽句柄,就是咱們的消費者,它會被動的接收數據。
const stream = fs.createReadStream(files["what-is-a-stream"], {
highWaterMark: 50,
});
stream.on("data", chunk => {
console.log("stream emit data");
console.log("Received:", chunk.toString());
});
複製代碼
所以,一樣由於 highWaterMark 的關係,流觸發了三次 'data' 事件。
stream emit data
Received: A stream is an abstract interface for working with
stream emit data
Received: streaming data in Node.js. The stream module prov
stream emit data
Received: ides an API for implementing the stream interface.
複製代碼
若是用推模型來描述這個過程,即如圖:
固然,還有一種更好更安全的方式,能夠在流動模式下自動管理數據。(爲何說安全是由於能夠自動處理目標可寫流的超負荷、背壓問題。)那就是:readableStream.pipe(writableStream)。
stream.pipe(process.stdout);
複製代碼
這種模式適合處理對流不須要細緻控制的場景,簡簡單單一個 pipe 就能夠達成咱們的目標,十分簡潔。
可讀流的兩種模式是對發生在可讀流中更加複雜的內部狀態管理的一種簡化的抽象。
固然,流的這兩種模式,咱們是能夠經過如下幾種方式進行切換。換而言之,也就是推和拉兩種模型的切換。
從默認的暫停模式切換到流動模式(拉轉推):
從流動模式,切換回到暫停模式(推轉拉):
這些在官方文檔均可以查到。筆者接下來會詳細示例幾種切換模式,以及其注意事項。
readable.resume() 方法將被暫停的可讀流恢復觸發 'data' 事件,並將流切換到流動模式。
咱們知道上述給流監聽 'data' 事件,流會切換到流動模式,同時開始觸發發射全部數據。一樣 resume 方法,也會切換流模式到流動模式,此時若還沒有添加 'data' 事件監聽,則有可能丟失數據。
考慮以下代碼:
// 從默認的暫停模式,切換到流動模式
stream.resume();
// 3ms 後才監聽 'data' 事件,在 3ms 期間可能已經丟失數據塊
setTimeout(() => {
stream.on("data", chunk => {
console.log("stream emit data");
console.log("Received:", chunk.toString());
});
stream.on("end", () => {
console.log("stream emit end");
});
}, 3);
複製代碼
此時數據塊可能會丟失,好比在筆者的電腦上,運行此代碼,此時只打印了兩個分塊。
stream emit data
Received: streaming data in Node.js. The stream module prov
stream emit data
Received: ides an API for implementing the stream interface.
stream emit end
複製代碼
很明顯,丟失了第一個分塊。所以,此場景須要特別留意!
readable.pause() 方法使流動模式的流中止觸發 'data' 事件,並切換出流動模式。 任何可用的數據都會保留在內部緩存中。
好比在 3ms 後調用 pause 暫停這個流,則這裏可能後面幾個數據塊來不及在 3ms 內輸出,就會留在內部緩存中。
// 從默認的暫停模式,切換到流動模式
stream.on("data", chunk => {
console.log("Received:", chunk.toString());
});
// 3ms 後將流切換到暫停模式,流暫停觸發 'emit' 事件
setTimeout(() => {
stream.pause();
}, 3);
複製代碼
好比咱們先在 3ms 後暫停,而後在 2000 ms 後再繼續,咱們在數據分塊以前,打印當前時間秒數。
// 從默認的暫停模式,切換到流動模式
stream.on("data", chunk => {
console.log(`Received at ${(new Date).getSeconds()}s: `, chunk.toString());
});
// 3ms 後將流切換到暫停模式,流暫停觸發 'emit' 事件
setTimeout(() => {
stream.pause();
}, 3);
// 2000ms 後將流切換到流動模式,流繼續觸發 'emit' 事件
setTimeout(() => {
stream.resume();
}, 2000);
複製代碼
控制檯打印以下:
Received at 42s: A stream is an abstract interface for working with
Received at 44s: streaming data in Node.js. The stream module prov
Received at 44s: ides an API for implementing the stream interface.
複製代碼
可是須要留意的是,若是流存在 'readable' 事件監聽器或調用了 stream.read(),則 readable.pause() 方法不起做用。
readable.unpipe() 方法解綁以前使用 stream.pipe() 方法綁定的可寫流。
當存在 **readableStream.pipe(writableStream) **模式,即爲此可讀流有管道目標。
這裏一樣用例子說明,不過抱歉的是,這裏筆者要調整一下 highWaterMark 爲更小的值。(由於下述例子中的 pipe 方法會自動管理數據流,原有的可讀緩存區較大 50 bytes 3 次讀取,讀取速度極快,難以在讀取完成前 unpipe),改爲 20 bytes 後,能讀取差很少 6 次,會稍微慢一點。
const stream = fs.createReadStream(files["what-is-a-stream"], {
highWaterMark: 20
});
// 切換到流動模式
stream.pipe(process.stdout);
setTimeout(() => {
// 切換爲暫停模式
stream.unpipe();
// 暫停模式下,讀取數據
stream.on("readable", () => {
let data;
while (null !== (data = stream.read())) {
console.log("From paused mode:", data.toString());
}
});
}, 3);
複製代碼
運行上述代碼,控制檯打印以下:
A stream is an abstrFrom paused mode: act interface for wo
From paused mode: rking with streaming
From paused mode: data in Node.js. Th
From paused mode: e stream module prov
From paused mode: ides an API for impl
From paused mode: ementing the stream
From paused mode: interface.
複製代碼
兩種模式並無孰好孰壞,歸根結底,它們都有各自的應用場景。可是對於開發者來講,大部分場景下,使用 pipe 就足夠了。
咱們能夠經過 readable.readableFlowing 來獲取當前可讀流的狀態。在任意時刻可讀流會處於如下三種狀態之一:
好比咱們基於上述的代碼打點看 readableFlowing 的狀態值。
console.log('\nReadableFlowing [before pipe]:', stream.readableFlowing);
stream.pipe(process.stdout);
console.log('\nReadableFlowing [after pipe]:', stream.readableFlowing);
setTimeout(() => {
stream.unpipe();
console.log('\nReadableFlowing [after unpipe]:', stream.readableFlowing);
stream.on("readable", () => {
let data;
while (null !== (data = stream.read())) {
console.log("From paused mode:", data.toString());
}
});
console.log('\nReadableFlowing [after readable]:', stream.readableFlowing);
}, 3);
複製代碼
控制檯打印以下:
ReadableFlowing [before pipe]: null
ReadableFlowing [after pipe]: true
A stream is an abstract interface for wo
ReadableFlowing [after unpipe]: false
ReadableFlowing [after readable]: false
From paused mode: rking with streaming
From paused mode: data in Node.js. Th
From paused mode: e stream module prov
From paused mode: ides an API for impl
From paused mode: ementing the stream
From paused mode: interface.
複製代碼
所以咱們得知在沒有提供消費流數據的機制,readable.readableFlowing 值爲 null。以後 true 和 false 分別表明着是否處於流動模式。
今天的系列二到此收尾了,相信同窗們對流的兩種模式也有了基本的瞭解~
前半篇中提到的推拉模型也是一個頗有趣的話題,若是有同窗很感興趣,很推薦去看這個視頻 Netflix JavaScript Talks - Version 7: The Evolution of JavaScript。推拉模型和流的暫停、流動模式,的確有很類似的設計理念,相信兩者之間都確定能夠互相參考借鑑的地方。
因此筆者仍然和系列一的意思是同樣的,對於學習任何技術來講,沒有必要把本身侷限在任何框框條條中,發揮本身的想象力,去實踐、去驗證,不管如何都會是一個有趣的學習過程。
歡呼~鼓掌~揉揉酸酸的手和眼睛 :)
下個系列見~