Node.js 可讀流和可寫流

Node.js操做按需數據使用sream API接口,stream 是一個數據集,數據可能不能立刻所有獲取到,他們在緩衝區,不須要在內存中。適合處理大數據集或者來自外部的數據源的數據塊 Node中不少內建模塊實現了流式接口:javascript

上面的列表中的原生Node.js對象就是可讀流和可寫流的對象。有些對象是可讀流也是可寫流,如TCP sockets,zlib 和 crypto streamsjava

這些對象是密切相關。當一個HTTP響應在客戶端上是一個可讀流,相應的在服務端是一個可寫流。這是由於在HTTP的狀況下,咱們基於從一對象(http.IncomingMessage)讀而從另一個對象(http.ServerResponse)寫node

stdio流(stdin,stdout,stderr)在子進程中會有反向流類型。這樣的話就能用很是簡單的方式管道傳送給其餘流或者主進程的stdio流。bash

Node.js中有4個基本的流類型:socket

  1. 可讀流(Readable)
  2. 可寫流(Writable)
  3. 雙工流(Duplex)
  4. 轉換流(Transform streams)
  • 可讀流是能夠被消耗的數據源的抽象,典型例子就是fs.createReadStream方法。
  • 可寫流是能夠寫入數據的目的地的抽象,典型例子就是 fs.createWriteStream 方法。
  • 雙工流既是可讀的也是可寫的,典型例子是TCP套接字。
  • 轉換流是基於雙工流的,它能夠用來修改或轉換數據,由於它是寫入和讀取的。 zlib.createGzip 就是一個用gzip來壓縮數據的轉換流例子。你能夠認爲轉換流就是一個函數,這個函數的輸入是一個可寫流,輸出是一個可讀流,你可能也據說過把轉換流叫作" 經過流 "。

全部的流都是 EventEmitter 的實例。他們在數據可讀或者可寫的時候發出事件。然而,咱們也能夠簡單的經過 pipe 方法來使用流數據。函數

pipe方法:

**readable**.pipe(**writableDest**)
複製代碼

這簡單的一行,鏈接了可讀流的輸出——源數據和可寫流的輸入——目標。源必須是可讀流,目標必須是可寫流。固然也能夠是雙工流或者轉換流,事實上,若是鏈接的是一個雙工流,能夠鏈式調用pipe:大數據

readable
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

複製代碼

pipe方法返回目標流,這使咱們可以執行上面的鏈式調用。對於流a(可讀)、b和c(雙工)和d(可寫)優化

a.pipe(b).pipe(c).pipe(d)

複製代碼
上面等價於
a.pipe(b)
b.pipe(c)
c.pipe(d)
複製代碼

pipe 方法是最簡單的方式去使用流,通常建議使用 pipe 方法或使用事件來處理流,可是要避免兩個混合使用。一般,當你使用 pipe 方法時,你不須要使用事件,可是若是你須要用更多定製的方式來處理流,那你能夠只用事件。ui

流事件

除了從可讀流裏讀取數據和向可寫流目標寫數據外,pipe方法將自動管理沿途的一些事情。例如,它處理錯誤、文件結束以及當一個流比另外一個流慢或更快時的狀況。this

然而,咱們也能夠直接使用事件來操做流。下面是pipe方法主要用於讀取和寫入數據的事件的簡化等效代碼:

# readable.pipe(writable) 等於下面
複製代碼
readable.on('data', (chunk) => {
  writable.write(chunk);
});

複製代碼
readable.on('end', () => {
  writable.end();
});
複製代碼

如下是可讀流可寫流的重要事件以及可用方法:

這些事件和函數在某種程度上是相關的,由於它們一般一塊兒使用。

可讀流中最重要的事件是:

  • data事件,每當流將數據塊傳遞給消費者時,它就會觸發。
  • end事件,當沒有更多的數據從流中被消耗時觸發。

可寫流中最重要的事件是:

  • drain事件,這是可寫流能夠接收更多數據的信號。
  • finish事件,當全部數據都給到底層系統時觸發。

能夠結合事件和函數來定製和優化流的使用。使用一個可讀的流,咱們能夠用pipe / unpipe方法,或read / Unshift / resume方法。使用一個可寫流,咱們能夠把它pipe / unpipe目的地,或是寫它的write方法調用end方法當咱們完成。

可讀流的暫停和流(flowing)模式

可讀流有兩種主要模式,這影響咱們可使用它們的方式:

  • 它們能夠是暫停(paused)模式
  • 或是流(flowing)模式

這些模式有時被稱爲拉和推模式。

全部可讀的流默認狀況下都是在暫停模式下啓動的,但在須要時能夠輕鬆切換到流模式或者返回到暫停狀態。有時,轉換是自動發生。

當一個可讀流處於暫停模式,咱們可使用 read() 方法按需的從流中讀取數據,然而,在流模式下的可讀流,數據是不斷流動的,咱們要監聽事件來使用這些數據。

在流模式下,若是沒有用戶處理數據,那麼實際上數據會丟失。這就是爲何當咱們在流模式中有可讀的流時,咱們須要一個 data 事件。事實上,只要添加一個 data 事件,就能夠將暫停模式轉換爲流模式,刪除 data 事件,流將切換回暫停模式。其中一些這樣作事爲了與舊的節點流接口向後兼容。

這兩個流模式之間手動開關,可使用 resume()pause() 方法。

當使用 pipe 方法讀取可讀流時,咱們沒必要擔憂這些模式,由於pipe自動管理它們。

實現流

當咱們談論Node.js中的流,主要有兩種不一樣的任務:

  • 實現流。
  • 使用流。

流的實現一般會 引入 (require)stream模塊。

實現可寫流

爲了實現可寫流,咱們須要使用流模塊中的 Writable 構造函數。

const { Writable } = require('stream');
複製代碼

咱們有不少方式來實現一個可寫流。例如,若是咱們想要的話,咱們能夠繼承Writable構造函數。

class myWritableStream extends Writable {
}
複製代碼

這裏用簡單的構造函數的方法。咱們只需給 Writable 構造函數傳遞一些選項並建立一個對象。惟一須要的選項是 Writable 函數,該函數揭露數據塊要往哪裏寫。

const { Writable } = require('stream');
複製代碼
const outStream = new Writable({
  **write**(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);
複製代碼

這個write函數有3個參數:

  • chunk 一般是一個buffer,除非咱們配置不一樣的流。
  • encoding 是在特定狀況下須要的參數,一般咱們能夠忽略它。
  • callback 是在完成處理數據塊後須要調用的函數。這是寫數據成功與否的標誌。若要發出故障信號,請用錯誤對象調用回調函數。

outstream,咱們只是用 console.log 把數據塊做爲一個字符串打印到控制檯,而後不用錯誤對象調用 callback 表示成功。這是一個很是簡單的可能也不那麼有用的 echo 流,它把收到的全部數據打印到控制檯。

爲了使用這個流,咱們能夠直接用 process.stdin 這個可讀流,就能夠把 process.stdin pipe給 outStream.

執行上面的代碼,任何咱們輸入給 process.stdin 的內容都會被 outStreamconsole.log 輸出到控制檯。

實現這個流不怎麼有用,由於它實際上被實現了並且node內置了,它等同於 process.stdout。如下一行代碼,就是把 stdin pipe給 stdout ,就能實現以前的效果:

process.stdin.pipe(process.stdout);
複製代碼

實現可讀流

爲了實現可讀流,引用Readable接口並用它構造新對象:

const { Readable } = require('stream');
複製代碼
const inStream = new Readable({});
複製代碼

有一個簡單的方法來實現可讀流。咱們能夠直接把供使用的數據 push 出去。

const { Readable } = require('stream');

const inStream = new Readable();

inStream.push('ABCDEFGHIJKLM');

inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);
複製代碼

push 一個 null 對象就意味着咱們想發出信號——這個流沒有更多數據了。

使用這個可寫流,能夠直接把它pipe給 process.stdout 這個可寫流。

執行以上代碼,會讀取 inStream 中全部的數據,並輸出在標準輸出流。很簡單,也不是頗有用。

咱們基本上在pipe給 process.stdout 以前把全部的數據都推到流裏了。更好的方法是按需推送。咱們能夠經過在一個可讀流的配置實現 read() 方法來作這件事情:

const inStream = new Readable({
  **read**(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});
複製代碼

當在可讀的流上調用讀方法時,實現能夠將部分數據推到隊列中。例如,咱們能夠一次推送一個字母,從字符代碼65(表明A),而且每推一次增長1:

const inStream = new Readable({
  read(size) {
    **this.push**(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      **this.push**(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
複製代碼

當從可讀流裏讀數據, read 方法將被持續調用,咱們就會推送更多的字母。咱們須要中止這個循環的條件,這就是爲何會一個if語句當currentcharcode大於90(表明Z)是推送null。

這段代碼至關於咱們開始使用的更簡單的代碼,可是當用戶要求時,咱們正在按需推送數據。你應該常常這樣作。

相關文章
相關標籤/搜索