可寫流是對數據流向設備的抽象,用來消費上游流過來的數據,經過可寫流程序能夠把數據寫入設備,常見的是本地磁盤文件或者 TCP、HTTP 等網絡響應。javascript
看一個以前用過的例子java
process.stdin.pipe(process.stdout);
*process.stdout* 是一個可寫流,程序把可讀流 process.stdin 傳過來的數據寫入的標準輸出設備。在瞭解了可讀流的基礎上理解可寫流很是簡單,流就是有方向的數據,其中可讀流是數據源,可寫流是目的地,中間的管道環節是雙向流。網絡
調用可寫流實例的 write() 方法就能夠把數據寫入可寫流dom
const fs = require('fs'); const rs = fs.createReadStream('./w.js'); const ws = fs.createWriteStream('./copy.js'); rs.setEncoding('utf-8'); rs.on('data', chunk => { ws.write(chunk); });
前面提到過監聽了可讀流的 data 事件就會使可讀流進入流動模式,咱們在回調事件裏調用了可寫流的 write() 方法,這樣數據就被寫入了可寫流抽象的設備中,也就是當前目錄下的 copy.js 文件。函數
write() 方法有三個參數ui
和自定義可讀流相似,簡單的自定義可寫流只須要兩步編碼
咱們來實現一個簡單的可寫流,把傳入可寫流的數據轉成大寫以後輸出到標準輸出設備(比較好的例子多是寫入本地磁盤文件,但涉及過多的 fs 操做,比較麻煩,偷個懶。寫入標準輸出設備也是一種寫入行爲)code
const Writable = require('stream').Writable class OutputStream extends Writable { _write(chunk, enc, done) { // 轉大寫以後寫入標準輸出設備 process.stdout.write(chunk.toString().toUpperCase()); // 此處不嚴謹,應該是監聽寫完以後才調用 done process.nextTick(done); } } module.exports = OutputStream;
和最終可寫流暴露出來的 write() 方法同樣, _write() 方法有三個參數,做用相似orm
固然其實還有一個 _writev() 方法能夠實現,這個方法僅被滯留的寫入隊列調用,能夠不實現。對象
有了可寫流的類以後咱們能夠實例化使用了,實例化可寫流的時候有幾個 option 可選,瞭解一下能夠幫助咱們理解後面要用的知識
這樣咱們就更清楚的知道 _write() 方法傳入的參數的含義了,並且對後面介紹 back pressure 機制的理解頗有幫助。
和可讀流同樣,可寫流也有幾個經常使用的事件,有了可讀流的基礎,理解起來比較簡單
- pipe 當可讀流調用 pipe() 方法向可寫流傳輸數據的時候會觸發可寫流的 pipe 事件
- unpipe 當可讀流調用 unpipe() 方法移除數據傳遞的時候會觸發可寫流的 unpipe 事件
這兩個事件用於通知可寫流數據將要到來和將要被切斷,在一般狀況下使用的不多。
writeable.write() 方法是有一個 bool 的返回值的,前面提到了 highWaterMark,當要求寫入的數據大於可寫流的 highWaterMark 的時候,數據不會被一次寫入,有一部分數據被滯留,這時候 writeable.write() 就會返回 false,若是能夠處理完就會返回 true
drain 當以前存在滯留數據,也就是 writeable.write() 返回過 false,通過一段時間的消化,處理完了積壓數據,能夠繼續寫入新數據的時候觸發(drain 的本意即爲排水、枯竭,挺形象的)
除了 write() 方法可寫流還有一個經常使用的方法 end(),參數和 write() 方法相同,但也能夠不傳入參數,表示沒有其它數據須要寫入,可寫流能夠關閉了。
finish 當調用 writable.end() 方法,而且全部數據都被寫入底層後會觸發 finish 事件
一樣出現錯誤後會觸發 error 事件
瞭解了這些事件,結合上以前提到的可讀流的一些知識,咱們就能探討一些有意思的話題了。在最開始咱們提到過用流相對於直接操做文件的好處之一是不會把內存壓爆,那麼流是怎麼作到的呢?
最開始咱們可能會想到由於流不是一次性把全部數據載入內存處理,而是一邊讀一邊寫。但咱們知道通常讀取的速度會遠遠快於寫入的速度,那麼 pipe() 方法是怎麼作到供需平衡的呢?
回憶一些基礎知識,咱們本身來實現一下 pipe() 方法的核心原理
咱們能夠利用這三點來作到數據讀取和寫入的同步,仍是使用以前的例子,但爲了使消費速度降下來,咱們各一秒再通知完成
class OutputStream extends Writable { _write(chunk, enc, done) { // 轉大寫以後寫入標準輸出設備 process.stdout.write(chunk.toString().toUpperCase()); // 故意延緩通知繼續傳遞數據的時間,形成寫入速度慢的現象 setTimeout(done, 1000); } }
咱們使用一下自定義的兩個類
const RandomNumberStream = require('./RandomNumberStream'); const OutputStream = require('./OutputStream'); const rns = new RandomNumberStream(100); const os = new OutputStream({ highWaterMark: 8 // 把水位下降,默認16k仍是挺大的 }); rns.on('data', chunk => { // 當待處理隊列大於 highWaterMark 時返回 false if (os.write(chunk) === false) { console.log('pause'); rns.pause(); // 暫停數據讀取 } }); // 當待處理隊列小於 highWaterMark 時觸發 drain 事件 os.on('drain', () => { console.log('drain') rns.resume(); // 恢復數據讀取 });
結合前面的三點和註釋很容易看懂上面代碼,這就是 pipe() 方法起做用的核心原理。數據的來源的去向咱們有了大概瞭解,後面能夠開始介紹數據的加工