參考文章:html
圖片來源 視覺中國前端
流(stream)是一種在 Node.js 中處理流式數據的抽象接口 ——官方文檔node
流是數據的集合,你能夠將它理解成數據鏈表或者字符串的形式,區別在於流中的數據並不能當即可用,這裏又能夠將其理解成水流。你無需將全部的數據一次性所有放入內存,相反,你可使用流這一特有的性質,完成對大量數據的操做以及逐段處理的操做gulp
在node異步處理數據的基礎上,流將要傳輸的數據處理成小份數據(chunk)連續傳輸,這樣經過更少的內存消耗,從而帶來更多的性能提高api
Node.js中有四種基本類型的流:緩存
Readable
-- 可讀流 能夠讀取數據的源的抽象。 eg. fs.createReadStream()
Writable
-- 可寫流 能夠寫入數據目標的抽象。 eg. fs.createWriteStream()
Duplex
-- 雙向流(雙工流) 既是可讀的,又是可寫的。 eg. not.Socket
Transform
-- 變換流(可變流) 讀寫過程當中能夠修改或者轉化數據的雙向流
。 eg. zlib.createDeflate()
全部的流都是 EventEmitter
的實例,他們發出能夠被讀和寫的事件,在這個基礎上,咱們可以很方便的利用 pipe
方法對這些流進行操做服務器
readableSrc.pipe(writableDest)
複製代碼
上面這個簡單的例子中,咱們利用 readable stream
的輸出做爲 writable stream
的輸入。 那麼再來想,若是咱們的輸入輸出都是 Duplex
那就能夠一直 pipe
下去,實現如 Linux 命令般連續的操做。 若是你有用過 gulp
進行前端資源的壓縮整合,對於此必定會印象深入網絡
下表中所有數據Node.js中原生的對象,這些對象也是能夠讀寫的流,一部分是雙工流與可變流 注意:一個 HTTP 相應在客戶端是可讀流,但在服務端就是可寫流。 stdio
流(stdin
, stdout
, stdout
)在子進程中有着與父進程中相反的類型,也正是這樣,父子通訊才變的簡單curl
Readable Stream | Writable Stream |
---|---|
HTTP response (客戶端) | HTTP request (客戶端) |
HTTP request (服務端) | HTTP response (服務端) |
fs read streams | fs write streams |
zlib streams | zlib streams |
crypto streams | crypto streams |
TCP sockets | TCP sockets |
child process stdout, stderr | child process stdin |
process.stdin | process.stdout, process.stderr |
big.file
寫入100萬行數據,文件大約 400Mconst fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
複製代碼
big.file
的 node 服務。使用 curl
鏈接啓動的 node 服務const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);
複製代碼
當啓動 node 服務並未鏈接時候,內存的佔用爲 8.7M,屬於正常狀況(下圖)異步
當使用 curl localhost:8000
鏈接服務器,能夠清晰看到一次性讀取會消耗多少內存(下圖)
createReadStram
方法,咱們能夠利用此方法將讀取到的流 pipe
到響應,減輕服務器負擔。代碼以及效果以下const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
複製代碼
能夠看到,node服務對於內存的壓力獲得了力度極大的釋放,而且在輸出速度上依然很快。這裏即很直觀的體現了此文一開始提到的,node利用流經過極少內存的佔用,高效完成了對大文件的操做。最後用一個形象的例子比喻一下上面的操做: 貨運工人有一車的貨物須要搬運,他能夠選擇將車上的貨物所有卸下,而後一塊兒搬到目的地;他還能夠選擇經過使用履帶,將貨物一件一件運輸到目的地。試想一下,這兩種方式操做的效率。
上面舉了一個「不恰當」可是可意會的「履帶」例子,其實node stream在計算機中真實的運做並無這麼簡單
生產者消費者問題 將會有助於更好理解流的原理
可讀流分爲兩種模式
data
事件或執行resume
方法改變)流動模式,能夠在比喻的基礎上理解爲三個節點,水泵(水源,數據源)、水桶(緩存容器,內存)、目的地(水流向的位置)
資源的數據流不會直接流向消費者,而會先通過 highWaterMark 的判斷,push 到緩存池(內存)中。若是超過 highWaterMark, push操做返回 false。最後的 resume()
、pause()
是通向消費者的一個閥門
原理與讀流相似,寫入速度足夠快會直接寫入資源,當寫入速度比較慢或者暫停寫入時候,數據會在緩存池中緩存起來,當生產者寫入過快,緩存池被放滿時候,這時候應當通知生產者暫停生產(好比下文write
方法返回false
),當緩存池被釋放空,Writable Stream 會給生產者發送 drain
消息,通知生產者再次開始寫入。ps:這裏的內容下文介紹 writable stream 時會有代碼示例
上面總體介紹了流的概念
、流的類型
、使用流的優勢
,接下來經過具體的代碼,整理一些在fs模塊中流的使用方式。
fs.createReadStream(path, )
const fs = require('fs);
const rs = fs.createReadStream('text.txt'); // options
/**
fs.createReadStream(path, {
flags: 'r', // 讀文件,文件不存在報錯,默認'r'
encoding: 'utf-8', // 以什麼編碼格式讀取文件(能夠被Buffer接收的任何格式),默認讀取buffer
autoClose: true, // 讀取後是否自動關閉文件,默認true
highWarterMark: 100, // 每次讀取的字節數,默認64k(65536)
start: 0, // 開始讀取的位置,默認0
end: 200 // 讀取文件的終點索引,默認 Infinity
})
**/
複製代碼
注意:
end
若是設置爲100,則須要讀取的字節數爲101,即0~100,包括100
由於默認flags
爲'r'
,若是path
指向的文件不存在,即會報錯
open
、data
、end
、close
、error
事件上文提到:全部的流都是 EventEmitrer
的實例
const fs = require('fs);
const rs = fs.createReadStream('text.txt');
rs.on('open', () => {
console.log('open');
});
rs.on('data', (datas) => {
console.log('file is read', datas);
})
rs.on('close', () => {
console.log('file is closed');
});
rs.on('error', (err) => {
console.log(err);
});
/**
依次輸出
open
文件的內容(buffer)
file is closed
**/
複製代碼
注意:
data
事件可能被屢次觸發,若是將highWarterMark
設置爲3,讀取寫有0123456789
的text.txt
文件時,會觸發四次,依次輸出0十二、34五、67八、9對應的buffer
pause
、resume
,暫停、恢復/** * text.txt文件內容 0123456789 */
const fs = require('fs');
const rs = fs.createReadStream('text.txt', {
encoding: 'utf-8',
highWaterMark: 3,
});
rs.on('data', (datas) => {
console.log(datas);
rs.pause();
console.log('stream is paused now');
});
rs.on('end', () => {
console.log('stream is end');
clearInterval(interval); // 清除定時器,不然會一直打印stream is resumed now
});
const interval = setInterval(() => {
rs.resume();
console.log('stream is resumed now');
}, 1000);
/** 輸出: 012 stream is paused now stream is resumed now 345 stream is paused now stream is resumed now 678 stream is paused now stream is resumed now 9 stream is paused now stream is end **/
複製代碼
注意: 沒什麼注意的
fs.createWriteStream(path, )
const fs = require('fs');
fs.createWriteStream(path, options);
const ws = fs.createWriteStream('2.txt', {
flags: 'w', // 默認'w'寫入文件,不存在則建立
encoding: 'utf-8'
fd: null, // 文件描述符
mode: 0o666, // 文件操做權限,同438
autoClose: true,
start: 0 // 開始寫入位置
highWarterMark: 16384 // !!! 文檔沒有給出這一設置,默認 16k,文末將驗證
});
複製代碼
注意:
options 參數與createReadStream
不一樣
也能夠設置highWaterMark
選項,官方文檔沒有給出,默認的寫入大小爲 16k,在可寫流對象執行write
方法的時候若是超出highWaterMark
,返回值將變成false
write
、 end
、 drain
、 finish
true
、 false
, 分別表明,表明當前內存中被寫入的數據是否超出 highWaterMark
(上面剛剛提到)write
以後數據並不會當即被寫入文件,而會在內存中緩存,而後依次寫入/** * write 方法 * chunk 寫入數據的buffer/string * encoding 編碼格式,可選。且chunk爲字符串時有用 * callback 寫入成功回調函數 **/
ws.write(chunk,[encoding],[callback]);
/** * end 方法,代表接下來沒有數據要被寫入 * chunk 寫入數據的buffer/string * encoding 編碼格式,可選。且chunk爲字符串時有用 * callback 回調函數,若是傳入,將做爲 finish 事件的回調函數 **/
ws.end(chunk,[encoding],[callback]);
/** * finish 方法,在調用了 stream.end() 方法,且緩衝區數據都已經傳給底層系統以後, 'finish' 事件將被觸發。 **/
const writer = fs.createWriteStream('2.txt');
for (let i = 0; i < 100; i++) {
writer.write(`hello, ${i}!\n`);
}
writer.end('結束\n');
writer.on('finish', () => {
console.error('全部的寫入已經完成!');
});
複製代碼
drain
方法
const fs = require('fs');
const ws = fs.createWriteStream('2.txt', {
encoding: 'utf-8',
highWaterMark: 3
});
let i = 10;
function write() {
let flag = true;
while(i && flag) {
flag = ws.write('1');
i --;
console.log(flag);
}
}
write();
ws.on('drain', () => {
console.log('drain');
write();
});
複製代碼
注意:
- 當一個流處在
drain
狀態,對write
的調用會被緩存(下面解釋),而且返回false
。一旦全部緩存的數據都被排空(被操做系統用來進行輸出),那麼drain
事件將被觸發,意思爲內存中緩存的數據已經被所有寫入到文件中,接下來能夠繼續執行write
向內存中寫入數據了- 若是你在手動控制讀寫以及緩存,建議這麼作,一旦
write
方法返回false,在drain
事件觸發前,最好不要寫入任何數據,固然這樣須要配合createWriteStream
的highWaterMark
參數,(這一參數文檔沒有給出)
pipe
、unpipe
、cork
、uncork
方法pipe
方法上面題目的幾種方法中,pipe
無疑使用最多,在流通常的使用場景下,pipe
能解決大部分的須要,下面一句很簡單的語義代碼就是 pipe
的使用方式,readable
經過 pipe
將數據傳輸給 writable
,正如其名,管道
readable.pipe(writable)
複製代碼
其基本原理爲:
pipe
方法,通知寫入一個簡單的例子:
const from = fs.createReadStream('./1.txt');
const to = fs.createWriteStream('./2.txt');
from.pipe(to);
複製代碼
以上的例子都是可讀流做爲輸入源,可寫流做爲返回結果,固然,若是咱們操做的是 duplex
/transform
,這時候就能夠很容易寫做鏈式調用
// 僞代碼
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
複製代碼
unpipe
方法/** * dest 當前readable pipe 管道的目標可寫流 **/
readable.unpipe(dest)
複製代碼
dest
未被指定,則 readable 綁定的全部流都將被分離cork
、uncork
方法stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
複製代碼
Readable Stream 可讀流的事件與方法
Event | Functions |
---|---|
data | pipe()、unpipe() |
end | read()、unshift() |
error | pause()、resume() |
close | isPaused() |
readable | setEncoding() |
Writable Stream 可寫流的事件與方法
Event | Functions |
---|---|
drain | write() |
finish | end() |
error | cork() |
close | uncork() |
pipe/unpipe | setDefaultEncoding() |
highWaterMark
fs.createWriteStream()
option 中 highWaterMark
做用,我在此文屢次提到,但願能夠加深印象方式一:
const fs = require('fs');
let count = 0;
const ws = fs.createWriteStream('testInput.txt');
for (let i = 0; i < 10000; i ++) {
count ++;
let flag = ws.write(i.toString());
if (!flag) { // 返回false即到達了highWaterMark
console.log('寫入' + count + '次');
break;
}
}
ws.end(function() {
console.log('文件寫入結束,輸出的總字節爲', ws.bytesWritten);
});
// 輸出:
寫入4374次
文件寫入結束,輸出的總字節爲 16386
16386 / 1024
// 結果:
16.001953125
複製代碼
方式二:
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 10000;
write();
function write() {
let ok = true;
while (i-- > 0 && ok) {
// 寫入結束時回調
= writer.write(data, encoding, i === 0 ? callback : null);
}
if (i > 0) {
// 這裏提早停下了,'drain' 事件觸發後才能夠繼續寫入
console.log('drain', i);
writer.once('drain', write);
}
}
}
const Writable = require('stream').Writable;
const writer = new Writable({
write(chunk, encoding, callback) {
// 比 process.nextTick() 稍慢
setTimeout(() => {
callback && callback();
});
}
});
writeOneMillionTimes(writer, '123456', 'utf8', () => {
console.log('end');
});
// 輸出
drain 7268
drain 4536
drain 1804
end
// 計算:
(10000-7268) * 6 / 1024
// 結果:16.0078125
複製代碼
本文主要從文件操做的角度探究流的原理以及使用方法,node應用中你可使用流作不少事情,網絡請求、文件上傳、命令行工具等等。 在Node.js應用中,流隨處可見,文件操做,網絡請求,進程、socket中流無處不在。正是這樣,流的特性能讓你的node應用真正體現出「小而美」的特性,
文章目的爲我的筆記,本人也是Node.js初學者,文中若有不恰當描述以及說明,歡迎指正交流。 文章借鑑了學習了不少大佬的文章(文首傳送門),很是感謝 後續有時間會繼續更新,祝本身node之路順利吧🤡