[譯]關於Node.js streams你須要知道的一切

Node.js的stream模塊是有名的應用困難,更別說理解了。那如今能夠告訴你,這些都不是問題了。node

多年來,開發人員在那裏建立了大量的軟件包,其惟一目的就是使用stream使用起來更簡單,可是在這篇文章裏,咱們專一於介紹原生的Node.js Steam Api。linux

"Stream 是Node.js中最好的卻最容易被誤解的部分" ----- Dominic Tarr程序員

Streams究竟是什麼

Streams是數據的集合,就跟數組和字符串同樣。不一樣點就在於Streams可能不是馬上就所有可用,而且不會所有載入內存。這使得他很是適合處理大量數據,或者處理每隔一段時間有一個數據片斷傳入的狀況。web

可是,Stream並不只僅適用於處理大數據(大塊的數據。。。)。使用它,一樣也有利於組織咱們大代碼。就像咱們使用管道去和合並強大的Linux命令。在Node.js中,咱們也能夠作一樣的事情。數組

clipboard.png

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

Node.js的不少內置模塊都實現了Stream接口服務器

clipboard.png

上面例子裏面的Node.js對象列表包括了可讀流和可寫流,有一些對象既是可讀流也是可寫流,像TCP sockets, zlib 和 crypto streams。微信

注意這些對象是有很密切的關聯的。當一個客戶端的HTTP 響應對象是一個可讀流,那麼在服務器端這就是一個可寫流。由於在HTTP例子中,咱們一般是從一個對象(http.IncomingMessage)讀取再寫入到另一個對象(http.ServerResponse)中去。異步

還要注意,當涉及到子進程時,stdio流(stdinstdoutstderr)具備逆流類型。這就容許咱們很是方便的使用管道從主進程鏈接子進程的Streamssocket

一些實例的Streams例子

理論都是很好的,但事實究竟是怎麼樣子的呢?讓咱們看一些例子示範代碼Streams在內存使用方面的比較。ide

咱們先建立一個大文件

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

看看我使用什麼建立文件的?一個可寫流嘛

fs模塊能夠經過Stream接口來讀取和寫入文件。在上面的例子中,咱們在循環中經過可寫流向big.file寫入了1百萬行數據。

運行上面的代碼會生成一個大概400M的文件

這是一個簡單的Node web服務器,專門爲big.file提供服務:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});

server.listen(8000);

server收到請求,它會使用異步方法fs.readFile處理這個big file。可是這並不表明咱們會打斷事件循環機制。一切都是正確的嗎??

那如今當咱們啓動server,看看內存監視器都發生了什麼。

clipboard.png

如今訪問這個服務器,看看內存的使用狀況。

圖片描述

內存佔用馬上飆升到434.8 MB。

在咱們把文件內容輸出到客戶端以前,咱們就把整個文件讀入了內存。這是很低效的。

HTTP response對象(上文中的res對象)也是一個可寫流,這就意味着若是咱們有一個表明着big file的可讀流,咱們能夠經過管道把他們倆鏈接起來實現一樣的功能,而不須要使用400M內存。

Node的fs模塊給咱們提供了一個能夠操做任何文件的可讀流,經過createReadStream方法建立。咱們能夠把它和response對象鏈接起來。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

如今再去訪問server的時候,使人驚訝的事情發生了(看內存監視器)

clipboard.png

發生了什麼?

當咱們訪問服務器的時候,咱們經過流每次使用一段數據,這意味着咱們不是把所有的數據都加載到內存中。內存使用量只上升了不到25M。

能夠把上面的例子用到極致,生成5百萬行數據而不是1百萬行。這樣子的話,這個文件的大小會超過2GB,這實際上大於Node中的默認緩衝區限制。

若是你想在server上使用fs.readFile,這在默認狀況下是行不通的,除非你改了Node.js的默認緩衝區限制。可是使用fs.createReadStream,把2 GB的數據返回給客戶端根本不存在問題,甚至內存使用量都沒有任何變化。

準備好學習Steam了嗎?

Streams 101

在Node.js中有4中基本的流類型:Readable, Writable, Duplex, and Transform streams。

  • Readable 可讀流是能夠從中消耗數據的源的抽象,一個例子就是fs.createReadStream方法
  • Writable 可寫流是能夠寫入數據的目標的抽象,一個例子就是fs.createWriteStream方法
  • duplex Steam是一個同時具備讀寫功能的流,一個例子就是TCP socket
  • Transform 是一個雙工流,它能夠在交換數據的時候作轉換。一個例子就是zlib.createGzip使用gzip壓縮數據。你能夠把Transform streams當成是一個傳入可讀流,返回一個可寫流的函數。它還有一個別名through streams

全部的Stream都是EventEmitter的實例對象。當流讀和寫的時候都會觸發相應的事件。可是還有一個更簡單的使用方法,那就是使用pipe

The pipe method

要記住下面這個魔幻方法

readableSrc.pipe(writableDest)

在這一行裏面,咱們經過管道把可讀流(源)輸出到一個可寫流裏面去(目標),源必須是一個可寫流,目標必須是可寫流。固然,他們也均可以是duplex/Transform。事實上,當咱們使用管道鏈接流的時候,咱們能夠像在linux中同樣使用鏈式鏈接。

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe方法返回目標流,這保證了咱們可使用鏈式調用。對於streams a(可讀流),b,c(可讀可寫流),d可寫流,咱們可使用:

a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d

pipe方法是使用流最簡便的方法。一般經過管道和事件的方法使用流,可是要儘可能避免二者混用。一般當你使用pipe方法就不須要使用事件了。可是當你須要更多定製的操做的話,使用事件的方式會更好。

Stream events

除了從可讀流讀取數據傳輸到可寫流,pipe方法還自動處理一些其餘事情。好比處理錯誤,處理文件結束操做,流之間速度快慢問題。

同時,流也能夠直接使用事件操做。如下是和管道相等的經過事件操做流的方法。

# readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});

下面是一些重要流的事件和方法。

clipboard.png

這些事件和方法在某種程度上是相關的,由於它們一般被一塊兒使用。

可讀流上的最重要的事件是

  • data事件,當可讀流傳輸了一段數據的時候會觸發
  • end事件,當沒有數據被傳輸時觸發

可寫流上的最重要的事件是

  • drain事件,當可寫流能夠接收事件的時候被觸發
  • finish事件,當全部數據被接收時被觸發

事件和方法能夠結合起來,以便定製和優化流的使用。讀取可讀流,咱們可使用pipe/unpipe方法,或者read/unshift/resume方法。使用可寫流,咱們能夠可寫流做爲pipe/unpipe方法的參數,或者使用write方法寫入,使用end方法關閉。

可讀流的暫停和流動

可讀流有兩個很重要的模式影響了咱們使用的方式。

  • 暫停模式
  • 流動模式

這些模式有時候被稱爲拉和推模式

全部的可讀流開始的時候都是默認暫停模式,可是它們能夠輕易的被切換成流動模式,當咱們須要的時候又能夠切換成暫停模式。有時候這個切換是自動的。

當一個可讀流是暫停模式的時候,咱們可使用read方法從流中讀取。可是當一個流是流動模式的時候,數據是持續的流動,咱們須要使用事件去監聽數據的變化。

在流動模式中,若是可讀流沒有監聽者,可讀流的數據會丟失。這就是爲何當可讀流逝流動模式的時候,咱們必須使用data事件去監聽數據的變化。事實上,只需添加一個數據事件處理程序便可將暫停的流轉換爲流模式,刪除數據事件處理程序將流切換回暫停模式。 其中一些是爲了與舊的Node Stream接口進行向後兼容。

可使用resume()pause()方法在這兩種模式之間切換。

clipboard.png

當咱們使用pipe方法操做可讀流的時候是不須要擔憂上面的這些操做的,由於pipe方法會自動幫咱們處理這些問題。

流的建立

當咱們討論Node.js中的流時,有兩項重要的任務:

  • 流的建立
  • 流的使用

咱們到如今爲止討論的都是如何使用流,那下面來看看如何建立吧!

Streams的建立一般使用stream模塊。

建立一個可寫流

爲了建立一個可寫流,咱們須要使用stream模塊裏面的Writable類。

const { Writable } = require('stream');

咱們能夠有不少種方式實現一個可寫流。例如,咱們能夠繼承Writable類。

class myWritableStream extends Writable {

}

可是我更喜歡使用構造函數的方式建立。經過給Writable傳遞一些參數來建立一個對象。惟一必需要傳的選項時write方法,它須要暴漏須要寫入的數據塊。

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

write方法接收3個參數

  • chunk一般是一個buffer對象,咱們能夠經過配置修改
  • encoding在這種狀況下就須要了,不過一般狀況是能夠忽略的
  • callback是當咱們處理完這個數據塊的時候須要調用的函數。這是一個寫入是否成功的信號。若是失敗了,給這個回調傳遞一個Error對象

outStream中,咱們簡單的把chunk打印出來,由於並無發生錯誤,咱們直接調用了callback方法。這是這是簡單並不實用的打印流。它會打印接收到的全部值。

爲了使用這個流,咱們能夠簡單的process.stdin這個可讀流。經過pipe方法鏈接起來。

當咱們運行上面的例子,任何咱們在控制檯輸入的內容都會被console.log打印出來。

這不是一個很是實用的流的實現,可是它已經被Node.js內置實現了。outStream功能和process.stdout基本相似。咱們也能夠經過pipe方法把stdinstdout鏈接起來並實現一樣的功能。

process.stdin.pipe(process.stdout);

建立一個可讀流

建立可讀流,咱們須要Readable

const { Readable } = require('stream');
const inStream = new Readable({});

建立一個可讀流很是簡單。可使用push方法推入數據給其餘流使用

const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

當咱們push一個null對象進去的時候,這就標誌着咱們要終止傳輸了。

咱們能夠簡單的把這個流經過pipe方法鏈接到一個可寫流process.stdout

運行上面的代碼,會獲取全部的inStream的數據並打印出來。很是簡單但有效。

咱們在經過pipe鏈接以前,就會把全部的數據推送到流裏面。更好的方法是在消費者要求時按需推送數據。能夠經過修改可讀流配置裏面的read()方法實現。

const inStream = new Readable({
  read(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});

當讀取方法在可讀流上被調用時,該實現能夠將部分數據推送到隊列。 例如,咱們能夠一次推一個字母,從字符代碼65(表示A)開始,並在每次推送時遞增:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

當有流在讀取可讀流的數據的時候,read方法會持續執行,這樣就會一直推出更多的字符。咱們須要在某個時刻終止它,這就是爲何咱們設置了一個終止條件推入了null

咱們應該始終按需推送數據。

Duplex/Transform 流的實現

使用Duplex流,咱們經過同一個對象實現可讀流和可寫流。這相似同時實現了兩個接口。

下面這個例子就結合了上面兩個可讀流和可寫流的綜合例子。

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

經過合併這些方法,咱們可使用這個duplex流讀取從A-Z的字母也一樣可使用它的打印功能。咱們把stdin流鏈接到這個duplex上去使用它的打印功能,再把這個duplex流自己鏈接到stdout上去就在控制檯看到了A-Z。

雙工流的可讀寫的兩側徹底獨立運行。就像一個對象上兩種獨立的功能。

transform流是一種更有趣的duplex流。由於它的輸出來源於她的輸入。

對於一個transform流,咱們不須要實現readwrite方法,咱們僅僅須要實現transform方法,這個方法合併了它們兩個。它具備寫入方法的功能,也能夠用它推送數據。

這是一個簡單的transform例子,把任何輸入轉換成大寫。

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

在這個transformstream裏面,像上個例子中雙工流同樣。可是咱們只實現了transform()方法。咱們把chunk轉換成大寫,再把大寫字母做爲可讀流的輸入。

Streams Object Mode

默認,流會接收 Buffer/String 類型的數據。還有個字段 objectMode 設置,可讓stream 接收任意類型的對象。

下面是一個這種類型的例子。如下變換流的組合使得將逗號分隔值的字符串映射爲JavaScript對象的功能。 因此「a,b,c,d」成爲{a:b,c:d}。

const { Transform } = require('stream');

const commaSplitter = new Transform({
  readableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});

const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});

const objectToString = new Transform({
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});

process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

咱們經過commasplitter傳遞輸入字符串(例如,「a,b,c,d」),它將數組做爲其可讀數據([「a」,「b」,「c」,「d」]))。 在該流上添加可讀的ObjectMode標誌是必要的,由於咱們正在將對象推送到其上,而不是字符串。

而後咱們把數組導入到arrayToObject數據流中,咱們須要把writableObjectMode設置爲 true,以表示arrayToObject會接收一個對象。另外它還會推送一個對象出去,因此還要把他的readableObjectModetrue。最後一個objectToString接收一個對象可是輸出字符串,因此就只須要設置一個writableObjectMode

clipboard.png

Node.js內置transform streams對象

Node有一些很是有用的內置transform streams對象。這包括zlibcrypto

下面這個例子使用了zlib.createGzip()結合了額fs readable/writable streams實現了文件壓縮。

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

你可使用上面的腳本壓縮任何你傳入的參數文件。咱們把文件的可讀流傳入了zlib的內置轉換流。再寫入到新的.gz文件中。

使用管道還有一個很酷的事情,就是能夠和事件結合起來。好比我想用戶看到進度,並在結束的時候發個消息。由於pipe方法會返回目標流,咱們也能夠經過鏈式註冊事件。

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

因此使用管道方法,咱們能夠輕鬆地操做流,可是咱們還可使用須要的事件進一步定製與這些流的交互。

管道方法的好處是,咱們能夠用它來以一種可讀的方式逐一構成咱們的程序。 例如,咱們能夠簡單地建立一個變換流來報告進度,而不用監聽上面的數據事件,並用另外一個.pipe()調用替換 .on() 調用:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

reportProgress流是一個簡單的pass-through流,可是也跟標準事件同樣報告進度。注意callback()函數的第二個參數,這至關於把數據推送出去。

結合流的應用是無止境的。例如,若是咱們須要在咱們gzip以前或以後加密文件,咱們須要作的就是按照咱們須要的確切順序來管理另外一個轉換流。使用Node的crypto模塊處理這個事情。

const crypto = require('crypto');
// ...

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

上面的腳本壓縮而後加密傳遞的文件,只有具備密碼的人才可使用文件。 咱們沒法使用正常的解壓縮實用程序解壓縮此文件,由於它已被加密。

爲了可以解壓縮文件,咱們須要使用徹底相反的操做,這也很簡單。

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));

假設傳遞的文件是壓縮版本,上面的代碼將建立一個讀取流,將其傳輸到crypto createDecipher()流中(使用相同的祕密),將其輸出管道輸入到zlib createGunzip()流中, 而後將文件寫回到沒有擴展名的文件中。

以上就是所有了,謝謝閱讀!!

翻譯自Node.js Streams: Everything you need to know

建立了一個程序員交流微信羣,你們進羣交流IT技術

圖片描述

若是已過時,能夠添加博主微信號15706211347,拉你進羣

相關文章
相關標籤/搜索