Node.js Stream(流)

流的概念

  • 流是一組有序的,有起點和終點的字節數據傳輸手段
  • 它不關心文件的總體內容,只關注是否從文件中讀到了數據,以及讀到數據以後的處理
  • 流(stream)在Node.js中是一個抽象接口,被Node中的不少對象所實現。好比HTTP 服務器request和response對象都是流
  • 流能夠是可讀的、可寫的,或是可讀寫的。全部的流都是 EventEmitter 的實例。

可讀流

1、兩種模式

可讀流會工做在下面兩種模式之一node

  • flowing模式:可讀流 自動(不斷的) 從底層讀取數據(直到讀取完畢),並經過EventEmitter 接口的事件儘快將數據提供給應用
  • paused模式:必須顯示調用stream.read() 方法來從流中讀取數據片斷

全部初始工做模式爲 paused 的可讀流,能夠經過下面三種途徑切換到 flowing 模式:api

  • 監聽 'data' 事件。
  • 調用 stream.resume() 方法。
  • 調用 stream.pipe() 方法將數據發送到 Writable。

可讀流能夠經過下面途徑切換到 paused 模式:緩存

  • 若是不存在管道目標(pipe destination),能夠經過調用 stream.pause() 方法實現。
  • 若是存在管道目標,能夠經過取消 'data' 事件監聽,並調用 stream.unpipe() 方法移除全部管道目標來實現。

下面演示如何從流中讀取數據
注:文件1.txt中的內容是1234567890bash

let fs = require("fs");
let rs = fs.createReadStream('1.txt',{   //這些參數是可選的,不須要精細控制能夠不設置
    flags:'r',      //文件的操做是讀取操做
    encoding:'utf8',//默認是null,null表明buffer,會按照encoding輸出內容
    highWaterMark:3,//單位是字節,表示一次讀取多少字節,默認是64k
    autoClose:true,//讀完是否自動關閉
    start:0,       //讀取的起始位置
    end:9          //讀取的結束位置,包括9這個位置的內容
})
//rs.setEncoding('utf8');  //能夠設置編碼方式

rs.on('open',function(){
    console.log('open');
})

rs.on('data',function(data){
     console.log('data');
})

rs.on('error',function(){
    console.log('error');
})
rs.on('end',function(){
    console.log('end');
})
rs.on('close',function(){
    console.log('close');
})
複製代碼

執行結果
服務器

open
123
456
789
0
end
close
複製代碼

一、fs.createReadStream建立可讀流實例時,默認打開文件,觸發open事件(並非每一個流都會觸發open事件),但此時並不會將文件中的內容輸出(由於處於‘暫停模式’,沒有事件消費),而是將數據存儲到內部的緩衝器buffer,buffer的大小取決於highWaterMark參數,讀取大小達到highWaterMark指定的閾值時,流會暫停從底層資源讀取數據,直到當前緩衝器的數據被消費
異步

二、這裏的rs能夠理解爲流的消費者,當消費者監聽了'data'事件時,就開始消費數據,可讀流會從paused切換到flowing「流動模式」,不斷的向消費者提供數據,直到沒有數據
函數

三、從打印結果能夠看出,可讀流每次讀取highWaterMark個數據,交給消費者,因此先打印123,再打印456 ... ...
ui

四、當讀完文件,也就是數據被徹底消費後,觸發end事件
編碼

五、最後流或者底層資源文件關閉後,這裏就是1.txt這個文件關閉後,觸發close事件
spa

六、error事件一般會在底層系統內部出錯從而不能產生數據,或當流的實現試圖傳遞錯誤數據時發生。

七、fs.createReadStream第二個參數是可選的,可不填,或只設置部分,好比編碼,不須要精細控制能夠不設置

模式切換

rs.on('data',function(data){ // 暫停模式 -> 流動模式
    console.log(data);
    rs.pause(); // 暫停方法 表示暫停讀取,暫停data事件觸發
});
setTimeout(function () {
    rs.resume(); //恢復data事件觸發,變爲流動模式
},1000)
//結果 open  123  456
複製代碼

一、上例當監聽data事件時,可讀流處於flowing模式,調用了pause()方法,會暫停data事件的觸發,切換到paused模式
二、resume()能夠恢復data事件觸發,再切換到flowing模式
三、上例中,setTimeout中切換流到flowing模式後,data事件觸發,但又遇到pause(),因此暫停了輸出,只打印到6

注意: 若是可讀流切換到 flowing 模式,且沒有消費者處理流中的數據,這些數據將會丟失。 好比, 調用了可讀流的resume() 方法卻沒有監聽 'data' 事件,或是取消了 'data' 事件監聽,就有可能出現這種狀況。

2、readable事件

let fs = require('fs');
let rs = fs.createReadStream('1.txt',{
    highWaterMark:2
});

rs.on('readable',function(){
    console.log('begin');
    let result = rs.read(2);
    console.log('result '+result);
});
複製代碼

'readable' 事件將在流中有數據可供讀取時觸發
上面已經說過,當咱們建立可讀流時,就會先把緩存區填滿(highWaterMark爲指定的單次緩存區大小),等待消費
若是緩存區被清空(消費)後,會觸發readable事件
到達流數據尾部時,readable事件也會觸發,觸發順序在end事件以前

rs.read(size)

  • 該方法從內部緩衝區中收取並返回一些數據,若是沒有可讀數據,返回null
  • size是可選的,指定要讀取size個字節,若是沒有指定,內部緩衝區所包含的全部數據將返回
  • 若是size字節不可讀,返回null,若是此時流沒有結束(除非流已經結束),會將全部保留在內部緩衝區的數據將被返回。好比:文件中有1個可讀字節,可是指定size爲2,這時調用read(2)會返回null,若是流沒有結束,那麼會再次觸發readable事件,將已經讀到內部緩衝區中的那一個字節也返回
  • rs.read()方法只應該在暫停模式下的可讀流上運行,在流動模式下,read會自動調用,直到內部緩衝區數據徹底耗盡

因此,上例中,若是文件1.txt中內容是 a,輸出結果

begin
result null
begin
result a
複製代碼

說明:highWaterMark是2,但文件只有a,因此只有1個字節在緩存區,而size指定了2,2個字節被認爲是不可讀的,返回null;再次觸發readable,將緩存區內容所有返回

若是內容是ab

begin
result ab
begin
result null
複製代碼

說明:highWaterMark是2,因此一開始緩存中有2個字節,size指定了2,因此將ab所有讀取,緩存清空——>繼續緩存,發現到文件末尾,因而觸發readable返回null

若是內容是abc,輸出

begin
result ab
begin
result null
begin
result c
複製代碼

說明:一開始緩存了2個,被消費掉,繼續緩存c,並觸發readable,再次read(2),此時沒有2個字節的數據,被認爲是不可讀的,返回null,而且再次觸發readable將緩存中剩餘數據讀取返回

若是內容是abcd,輸出

begin
result ab
begin
result cd
begin
result null
複製代碼

說明:先讀完2個字節,即ab輸出,緩存區被清空,因此會再次觸發readable事件,再read(2)讀出cd,繼續自動緩存,發現到了文件末尾,又會觸發readable,返回null

在某些狀況下,爲 'readable' 事件添加回調將會致使一些數據被讀取到內部緩存中

這句話個人理解是,當消費數據大小 < 緩存區大小,可讀流會自動添加highWaterMark個數據到緩存,那麼新添加的數據和以前緩存區中未被消費的數據加一塊兒,有可能超過了highWaterMark大小,即緩存區大小增長了

下面將highWaterMark改成3,read(1)再來看看怎麼執行的

let rs = fs.createReadStream('1.txt',{
    highWaterMark:3
});
rs.on('readable',function(){
    console.log('begin');
    let result = rs.read(1);
    console.log('result '+result);
});
複製代碼

當1.txt內容是 a,輸出

begin
result a
begin
result null
複製代碼

說明:緩存中只有a,也只讀了一個(read(1)),消費後,緩存區清空,再去讀取時,已經到了文件末尾,返回null

當1.txt內容是 ab,輸出

begin
result a
begin
result b
複製代碼

說明:緩存中有ab,當讀完a後,繼續緩存,發現到了文件末尾,觸發readable,而此時緩存中還有b,所以將b返回

當讀取個數size > 緩存區個數,會去更改緩存區的大小highWaterMark(規則爲找知足>=size的最小的2的幾回方)

let rs = fs.createReadStream('1.txt',{
    highWaterMark:3
});

rs.on('readable',function(){
    console.log('begin');
    let result = rs.read(4);
    console.log('result '+result);
});
複製代碼

當1.txt中內容是abcdefgh,輸出

begin
result null
begin
result abcd
複製代碼

說明:讀取的size(4)>緩存,認爲是不可讀的,size返回null;這時會從新計算highWaterMark大小,離4最近的是2的2次方,爲4,因此highWaterMark此時等於4,返回了abcd;繼續緩存efgh

但若是1.txt內容是abcdefg,輸出

begin
result null
begin
result abcd
begin
result efg
複製代碼

同上,但當返回abcd繼續自動緩存4個時,發現讀到文件末尾,將緩存數據返回,因此efg也輸出

可寫流

可寫流是對數據寫入'目的地'的一種抽象。

可寫流基本用法

let fs = require('fs');
let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    mode:0o666,
    autoClose:true,
    highWaterMark:3, // 默認是16k ,而createReadStream是64k
    encoding:'utf8',//默認是utf8
    start:0
});
for(let i = 0;i<4;i++){
    let flag =  ws.write(i+'');
    console.log(flag)
}
ws.end("ok");// 標記文件末尾

ws.on('open',function(){
    console.log('open')
});

ws.on('error',function(err){
    console.log(err);
});

ws.on('finish',function(err){
    console.log('finish');
});

ws.on('close',function(){
    console.log('close')
});
複製代碼

打印結果

true
true
false
false
open
finish
close
複製代碼

寫入文件1.txt的結果 0123ok

一、fs.createWriteStream建立可寫流,一樣默認會打開文件

二、可寫流經過反覆調用 ws.write(chunk) 方法將數據放到內部緩衝器
寫入的數據chunk必須是字符串或者buffer
write雖然是個異步方法,但有返回值,這個返回值flag的含義,不是文件是否寫入,而是表示可否繼續寫入
即緩衝器總大小 < highWaterMark時,能夠繼續寫入,flag爲true;
一旦內部緩衝器大小達到或超過highWaterMark,flag返回false;
注意,即便flag爲flase,寫入的內容也不會丟失

三、上例中指定的highWaterMark是3,調用write時一次寫入了一個字節,當調用第三次write方法時,緩衝器中的數據大小達到3這個閾值,開始返回flase,因此先打印了兩次true,後打印了兩次false

四、ws.end("ok"); end方法用來標記文件末尾,表示接下來沒有數據要寫入可寫流;
能夠傳入可選的 chunk 和 encoding 參數,在關閉流以前再寫入一段數據;
若是傳入了可選的 callback 函數,它將做爲 'finish' 事件的回調函數。因此'ok'會被寫入文件末尾。
注意,ws.write()方法必須在ws.end()方法以前調用

五、在調用了 ws.end() 方法,且緩衝區數據都已經傳給底層系統(這裏是文件1.txt)以後, 'finish' 事件將被觸發。

六、'close' 事件將在流或其底層資源(好比一個文件)關閉後觸發。'close'事件觸發後,該流將不會再觸發任何事件。不是全部 可寫流/可讀流 都會觸發 'close' 事件。

drain事件

若是調用 stream.write(chunk) 方法返回 false,'drain' 事件會在適合恢復寫入數據到流的時候觸發。

drain觸發條件

  • 緩衝器滿了,即write返回false
  • 緩衝器的數據都寫入到流,即數據都被消費掉後,纔會觸發

將上例中for循環改成以下

let i = 8;
function write(){
    let flag = true;
    while(i>0&&flag){
        flag = ws.write(--i+'','utf8',()=>{});
        console.log(flag)
    }

    if(i <= 0){
        ws.end("ok");
    }
 }
 write();
 // drain只有當緩存區充滿後 ,而且被消費後觸發
 ws.on('drain',function(){
   console.log('drain');
   write();
 });
複製代碼

打印

true
true
false
open
drain
true
true
false
drain
true
true
finish
close
複製代碼

文件1.txt寫入 76543210ok

上例當write返回爲false,即緩衝器滿了時,中止while循環,等待;當緩衝器數據都寫入1.txt以後,會觸發drain事件,這時繼續write,直到寫到0,中止寫入,調用end,在文件末尾寫入ok,關閉文件

管道流 & pipe事件

管道提供了一個輸出流到輸入流的機制。一般咱們用於從一個流中獲取數據並將數據傳遞到另一個流中

以下,將1.txt的內容,按照讀一點,寫一點的方式 寫入2.txt

let fs = require('fs');
let rs = fs.createReadStream('1.txt',{
    highWaterMark:4
});
let ws = fs.createWriteStream('2.txt',{
    highWaterMark:3
});
rs.pipe(ws);   //可讀流上調用pipe()方法,pipe方法就是讀一點寫一點
複製代碼

這段代碼工做原理相似於下面這段代碼

rs.on('data',function(chunk){ // chunk 讀到的內容
    let flag = ws.write(chunk);
    if(!flag){  //若是緩衝器滿了,寫不下了,就中止讀
        rs.pause();
    }
});
ws.on('drain',function(){ //當緩存都寫到文件了,恢復讀
    console.log('寫一點');
    rs.resume();
});
複製代碼

參考資料 一、nodejs.cn/api/stream.…
二、www.runoob.com/nodejs/node…

相關文章
相關標籤/搜索