Node中的流

一.流的概念
stream是數據集合,與數組、字符串差很少。但stream不一次性訪問所有數據,而是一部分一部分發送/接收(chunk式的),因此沒必要佔用那麼大塊內存,尤爲適用於處理大量(外部)數據的場景數組

stream具備管道(pipeline)特性,例如:緩存

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

不少原生模塊都是基於stream的,包括進程的stdin/stdout/stderr:socket

Node中的流

例如常見的場景:ide

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

其中pipe方法把可讀流的輸出(數據源)做爲可寫流的輸入(目標),直接把讀文件的輸出流做爲輸入鏈接到HTTP響應的輸出流,從而避免把整個文件讀入內存函數

P.S.甚至平常使用的console.log()內部實現也是stream工具

二.流的類型
Node中有4種基礎流:ui

Readablethis

可讀流是對源的抽象, 從中能夠消耗數據,如fs.createReadStream命令行

Writable設計

可寫流是對可寫入數據的目標的抽象,如fs.createWriteStream

Duplex(雙工)

雙工流既可讀又可寫,如TCP socket

Transform(轉換)

轉換流本質上是雙工流,用於在寫入和讀取數據時對其進行修改或轉換,如zlib.createGzip用gzip壓縮數據

轉換流看一看作一個輸入可寫流,輸出可讀流的函數

P.S.有一種轉換流叫(Pass)Through Stream(經過流),相似於FP中的identity = x => x

三.管道
src.pipe(res)要求源必須可讀,目標必須可寫,因此,若是是對雙工流進行管道傳輸,就能夠像Linux的管道同樣鏈式調用:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe()方法返回目標流,因此:

// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 等價於
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux下,等價於
$ a | b | c | d

四.流與事件
事件驅動是Node在設計上的一個重要特色,不少Node原生對象都是基於事件機制(EventEmitter模塊)實現的,包括流(stream模塊):

Most of Node’s objects — like HTTP requests, responses, and streams — implement the EventEmitter module so they can provide a way to emit and listen to events.

全部stream都是EventEmitter實例,經過事件機制來讀寫數據,例如上面提到的pipe()方法至關於:

// readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

P.S.pipe還處理了一些別的事情,好比錯誤處理,EoF以及某個流的速度較快/較慢的狀況

Readable與Writable stream的主要事件和方法以下:

Node中的流

Readable的主要事件有:

data事件:stream把一個chunk傳遞給使用者時觸發

end事件:再沒有要從stream中獲取(consume)的數據時觸發

Writable的主要事件有:

drain事件,斷流了,這是Writable stream能夠接收更多數據的信號

finish事件,當全部數據都已flush到下層系統時觸發

五.Readable stream的兩種模式:Paused與Flowing
一個Readable stream要麼流動(Flowing)要麼暫停(Paused),也被稱爲拉(pull)和推(push)兩種模式

建立出來後默認處於Paused狀態,能夠經過read()方法讀取數據。若是處於Flowing狀態,數據會持續地流出來,此時只須要經過監聽事件來使用這些數據,若是沒有使用者的話,數據會丟失,因此都會監聽Readable stream的data事件,實際上監聽data事件會把Readable stream從Paused狀態切換到Flowing,移除data事件監聽會再切回來。須要手動切換的話,能夠經過resume()和pause()來作

使用pipe()方式時不用關心這些,都會自動處理穩當:

Readable觸發data事件,直到Writable忙不過來了

pipe收到信號後調用Readable.pause(),進入Paused模式

Writable再幹一下子壓力不大了的時候,會觸發drain事件,此時pipe調用Readable.resume()進入Flowing模式,讓Readable接着觸發data事件

highWaterMark與backpressure
其實drain事件就是用來應對Backpressure現象的,簡單的說,Backpressure就是下游的消費速度限制了傳輸,形成下游向上遊的反向壓力

若是消費速度慢於生產速度,會在下游產生堆積,來不及處理的數據會存放到Writable的buffer裏,若是不加(限流)處理,這個buffer會持續增加,可能溢出進而形成錯誤或數據丟失

Backpressure現象發生的標誌是Writable.write()返回了false,說明來自上游的待處理數據量已經觸及highWaterMark(高水位線,默認16kb):

Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.

這是下游開始有點緊張了(todo項足夠忙一陣子了)的信號。建議在此時對上游限流,即調用Readable.pause()先給停了,給下游多點時間處理堆積的數據,下游以爲輕鬆了會觸發darin事件,表示此時有能力處理更多數據了,因此這時候應該開閘放水(Readable.resume())

注意,Readable的數據會存放在緩存中,直到有個Writable來消耗這些數據。因此Paused狀態只是說不往下流了,已經緩存的數據還在Readable的buffer裏。因此若是不限流,來不及處理的數據就緩存在下游,並持續堆積,限流的話,這部分數據被緩存在上游,由於限流了而再也不持續堆積

另外,Readable也有highWaterMark的概念:

The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams

是對從實際數據源讀取速度的限制(好比從磁盤讀文件),防止生產速度太快引起緩存堆積(好比一頓猛push())。因此Flowing Readable的正常工做方式是被push()–push()–push()…誒,發現buffer裏的量已經攢夠一個chunk了,吐給下游。一樣,Readable觸及highWaterMark的標誌是push()返回false,說明Readable的buffer不那麼十分空了,此時若是還持續push(),沒錯,也會出現BackPressure(Readable消費能力限制了從數據源到Readable的傳輸速度):

快-------------慢
數據源-------->Readable------->Writable
                 快--------------慢

只要上游(生產)快,下游(消費)慢就會出現BackPressure,因此在readable.pipe(writable)的簡單場景,可能會出現上面兩段BackPressure

六.示例
Writable stream
常見的造大文件:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

經過fs.createWriteStream()建立指向文件的Writable stream,經過write()填充數據,寫完後end()

或者更通常的,直接new一個Writable:

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    // nowrap version
    // process.stdout.write(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

一個最簡單的echo實現,把當前進程的標準輸入接到自定義輸出流outStream,像日誌中間件同樣(標準輸入流經outStream,再該幹嗎幹嗎去callback):

cc
oo
nn
ss
oo
ll
ee

Console {
  log: [Function: bound consoleCall],
  ...
}

write()方法的3個參數中,chunk是個Buffer,encoding在某些場景下須要,大多數時候能夠忽略,callback是應該在chunk處理完畢後調用的通知函數,代表寫入成功與否(失敗的話,傳Error對象進去),相似於尾觸發機制中的next()

或者更簡單的echo實現:

process.stdin.pipe(process.stdout);

直接把標準輸入流鏈接到標準輸出流

Readable stream
const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

經過push向Readable stream裏填充數據,push(null)表示結束。上例中把全部數據都讀進來,而後才交給標準輸出,實際上有更高效的方式(按需推數據給使用者):

const { Readable } = require('stream'); 
const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

read()方法每次吐一個字符,使用者從Readable stream取數據的時候,read()會持續觸發

Duplex/Transform stream

Duplex stream兼具Readable和Writable的特色:既能夠做爲數據源(生產者),也能夠做爲目標(消費者)。例如:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

上例把前2個例子結合起來了,inoutStream被鏈接到標準輸出流了,A-Z會做爲數據源傳遞給標準輸出(打印出來),同時標準輸入流被接到inoutStream,來自標準輸入的全部數據會被log出來,效果以下:

ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
ee

Console {
  log: [Function: bound consoleCall],
  ...
}

P.S.先輸出A-Z是由於pipe()會把Readable stream切換到Flowing模式,因此一開始就把A-Z「流」出來了

注意,Duplex stream的Readable與Writable部分是徹底獨立的,讀寫互不影響,Duplex只是把兩個特性組合成一個對象了,就像兩根筷子同樣綁在一塊兒的單向管道

Transform stream是一種有意思的Duplex stream:其輸出是根據輸入計算得來的。因此不用分別實現read/write()方法,只實現一個transform()方法就夠了:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  // 函數簽名與write一致
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

一樣,Transform stream的Readable與Writable部分也是獨立的(不手動push就不會自動傳遞到Readable部分),只是形式上結合起來了

P.S.另外,stream之間除了能夠傳遞Buffer/String,還能夠傳遞Object(包括Array),具體見Streams Object Mode

Node提供了一些原生Transform stream,例如zlib和crypto stream:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

簡單的命令行工具,gzip壓縮。更多示例見Node’s built-in transform streams

參考資料
Node.js Streams: Everything you need to know

Node.js writable.write return false?

探究 Node.js 中的 drain 事件

深刻理解 Node.js Stream 內部機制

Backpressuring in Streams

相關文章
相關標籤/搜索