在前端工程化中產生了不少工具,例如grunt,gulp,webpack,babel...等等,這些工具都是經過node中的stream實現。 在node中stream也是很是很是很是重要的模塊,好比咱們經常使用的console就是基於stream的實例,還有net,http等核心模塊都是基於stream來實現的,可見stream是多麼的重要。javascript
是一種數據傳輸手段,從一個地方傳輸到另外一個地方。
在寫node的時候會存在讀取文件,好比如今咱們有一個很是大的文件,50G吧前端
const fs = require('fs');
// test文件50個G
fs.readFileSync('./test.text');
複製代碼
這個時候須要消耗大量的時候去讀取這個文件,然而咱們可能關心的並非文件全部內容,還會存在直接讀取失敗。stream就是爲了解決這些問題而產生,咱們讀一些數據處理一些數據,當讀到所關心數據的時候,則能夠再也不繼續讀取。java
stream翻譯成中文‘流’,就像水同樣,從水龍頭流向水杯。node
stream繼承於EventEmitter,擁有事件觸發和事件監聽功能。主要分爲4種基本流類型:
webpack
在流中存在一個重要的概念,緩存區,就像拿水杯去接水,水杯就是緩存區,當水杯滿,則會關閉水龍頭,等把水杯裏面的水消耗完畢,再打開水龍頭去接水。
stream默認緩存區大小爲16384(16kb),能夠經過highWaterMark參數設置緩存區大小,但設置encoding後,以設置的字符編碼爲單位衡量。web
首先建立一個可讀流,可接收5個參數:gulp
可讀流中分爲2種模式流動模式和暫停模式。
監聽data事件,觸發流動模式,會源源不斷生產數據觸發data事件:前端工程化
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
// 這裏傳入的read方法,會被寫入_read()
read: (size) => {
// size 爲highWaterMark大小
// 在這個方法裏面實現獲取數據,讀取到數據調用rs.push([data]),若是沒有數據了,push(null)結束流
if (i < 10) {
rs.push(`當前讀取數據: ${i++}`);
} else {
rs.push(null);
}
},
// 源代碼,可覆蓋
destroy(err, cb) {
rs.push(null);
cb(err);
}
});
rs.on('data', (data) => {
console.log(data);
// 每次push數據則觸發data事件
// 當前讀取數據: 0
// 當前讀取數據: 1
// 當前讀取數據: 2
// 當前讀取數據: 3
// 當前讀取數據: 4
// 當前讀取數據: 5
// 當前讀取數據: 6
// 當前讀取數據: 7
// 當前讀取數據: 8
// 當前讀取數據: 9
})
複製代碼
監聽readable事件,觸發暫停模式,當流有了新數據或到了流結束以前觸發readable事件,須要顯示調用read([size])讀取數據:數組
const { Readable } = require('stream');
let i = 0;
const rs = Readable({
encoding: 'utf8',
highWaterMark: 9,
// 這裏傳入的read方法,會被寫入_read()
read: (size) => {
// size 爲highWaterMark大小
// 在這個方法裏面實現獲取數據,讀取到數據調用rs.push([data]),若是沒有數據了,push(null)結束流
if (i < 10) {
// push實際上是把數據放入緩存區
rs.push(`當前讀取數據: ${i++}`);
} else {
rs.push(null);
}
}
});
rs.on('readable', () => {
const data = rs.read(9);
console.log(data);
//
})
複製代碼
read([size]) size參數:緩存
這裏的緩存區數據不是指highWaterMark,獲取緩存區數據大小rs._readableState.length。
流的模式能夠自由切換: 經過rs._readableState.flowing的值獲取當前狀態
rs.pause()切換到暫停模式 rs.resume()切換到流動模式
在可讀流裏面還能夠監聽其餘事件:
rs.on('close', () => {
// 流關閉時或文件關閉時觸發
})
rs.on('end', () => {
// 在流中沒有數據可供消費時觸發
})
rs.on('error', (err) => {
// 發生錯誤時候
})
複製代碼
可寫流可接受參數:
在實現流除了用上面直接傳入參數的方式,還能夠用繼承類
class WS extends stream.Writable {
constructor() {
super({
highWaterMark: 1
});
}
_write(chunk, encoding, cb) {
console.log(this._writableState.length);
// chunk 爲須要寫入的數據
// encoding 字符編碼
// cb 回調函數, 若是寫入成功須要調用cb去執行下一次寫入,若是發生錯誤,能夠cb(new Error([錯誤信息]))
if (chunk.length < 4) {
fs.writeFileSync('./2.text', chunk, {
flag: 'a'
});
cb();
} else{
cb(new Error('超出4個字節'));
}
}
}
const ws = new WS();
let i = 0;
function next() {
let flag = true;
// write() 會返回boolean false -> 緩存區沒滿 true —> 已滿,須要暫停寫入數據
while(i < 10 && flag) {
flag = ws.write(`${i++}`);
console.log('flag', flag);
}
}
next();
// 當全部緩存區數據已經成功寫入底層數據,緩存區沒有數據了,觸發drain事件
ws.on('drain', () => {
console.log('drain');
// 繼續寫入緩存區數據
next();
})
複製代碼
可寫流的end事件,一旦觸發end事件,後續不能再寫入數據.
ws.write('start');
ws.end('end');
ws.wrtie('test'); // 報錯 write after end
複製代碼
finish事件:
ws.write('start');
ws.end('end');
ws.on('finish', () => {
console.log('調用end方法後,而且全部數據已經寫入底層')
})
複製代碼
cork()與uncork(),強制全部數據先寫入緩存區,直到調用uncork()或end(),這時一併寫入底層:
const ws = stream.Writable({
writev(chunks, encoding, cb) {
// 這時chunks爲一個數組,包含全部的chunk
// 如今length爲10
console.log(chunk.length);
}
});
// 寫入數據以前,強制寫入數據放入緩存區
ws.cork();
// 寫入數據
for (let i = 0; i < 10; i++) {
ws.write(i.toString());
}
// 寫入完畢,能夠觸發寫入底層
ws.uncork();
複製代碼
讀寫流,該方法繼承了可寫流和可讀流,但相互之間沒有關係,各自獨立緩存區,擁有Writable和Readable全部方法和事件,同時實現_read()和_write()方法。
const fs = require('fs');
const stream = require('stream');
const duplex = stream.Duplex({
write(chunk, encoding, cb) {
console.log(chunk.toString('utf8')); // 寫入
},
read() {
this.push('讀取');
this.push(null);
}
});
console.log(duplex.read(6).toString('utf8')); // 讀取
duplex.write('寫入');
複製代碼
轉換流,這個流在前端工程化中用到最多,從一個地方讀取數據,轉換數據後輸出到一個地方,該流繼承於Duplex。
const fs = require('fs');
const stream = require('stream');
const transform = stream.Transform({
transform(chunk, encoding, cb){
// 把數據轉換成大寫字母,而後push到緩存區
this.push(chunk.toString().toUpperCase());
cb();
}
});
transform.write('a');
console.log(transform.read(1).toString()); // A
複製代碼
可讀流和可寫流都須要咱們去實現父類的方法,那麼fs這個模塊幫咱們作了這件事情,fs裏面實現了高效而且可靠的可讀/可寫流,提供快速建立流,再也不去實現父類_write()或_read()。下面咱們來看看如何使用:
const fs = require('fs');
/** * 建立可讀流 * * 第一個參數文件路徑 * * 第二個參數爲options * flags?: string; encoding?: string; 字符編碼 fd?: number; 文件打開後的標識符 mode?: number; 文件的權限 autoClose?: boolean; 讀取完畢後,是否自動關閉文件 start?: number; 從哪一個位置開始讀取 end?: number; 讀到何時結束 highWaterMark?: number; 最高水位線 */
const rs = fs.createReadStream('1.text');
rs.on('data', data => {
console.log(data);
})
/** * 建立可寫流 * * 第一個參數文件路徑 * * 第二個參數爲options * flags?: string; encoding?: string; 字符編碼 fd?: number; 文件打開後的標識符 mode?: number; 文件的權限 autoClose?: boolean; 寫入完畢後,是否自動關閉文件 start?: number; 從什麼位置開始寫入 */
const ws = fs.createWriteStream('2.text');
ws.write('123');
複製代碼
在流中搭建一條管道,從可讀流中到可寫流。
可讀流中有pipe()方法,在可寫流中能夠監聽pipe事件,下面實現了從可讀流中經過管道到可寫流:
const fs = require('fs');
const stream = require('stream');
const rs = stream.Readable({
read() {
this.push(fs.readFileSync('./1.text')); // 文件內容 test
this.push(null);
}
});
const ws = stream.Writable({
write(chunk, encoding, cb) {
// chunk爲test buffer
fs.writeFileSync('./2.text', chunk.toString());
cb();
}
});
ws.on('pipe', data => {
// 觸發pipe事件
console.log(data);
});
rs.pipe(ws);
複製代碼
流分爲四種基本類型,兩種模式。流中的數據不是直接寫入或讀取,有緩存區的概念。