node中的精髓Stream(流)

在前端工程化中產生了不少工具,例如grunt,gulp,webpack,babel...等等,這些工具都是經過node中的stream實現。 在node中stream也是很是很是很是重要的模塊,好比咱們經常使用的console就是基於stream的實例,還有net,http等核心模塊都是基於stream來實現的,可見stream是多麼的重要。javascript

1.什麼是stream?

是一種數據傳輸手段,從一個地方傳輸到另外一個地方。
在寫node的時候會存在讀取文件,好比如今咱們有一個很是大的文件,50G吧前端

const fs = require('fs');
	// test文件50個G
	fs.readFileSync('./test.text');
複製代碼

這個時候須要消耗大量的時候去讀取這個文件,然而咱們可能關心的並非文件全部內容,還會存在直接讀取失敗。stream就是爲了解決這些問題而產生,咱們讀一些數據處理一些數據,當讀到所關心數據的時候,則能夠再也不繼續讀取。java

stream翻譯成中文‘流’,就像水同樣,從水龍頭流向水杯。node

2. Stream模塊

stream繼承於EventEmitter,擁有事件觸發和事件監聽功能。主要分爲4種基本流類型:
webpack

  1. Readable (可讀流)
  2. Writable (可寫流)
  3. Duplex (讀寫流)
  4. Transform (轉換流)
    在流中默承認操做的類型string和Buffer,若是須要處理其餘類型的js值須要傳入參數objectMode: true(默認爲false)

在流中存在一個重要的概念,緩存區,就像拿水杯去接水,水杯就是緩存區,當水杯滿,則會關閉水龍頭,等把水杯裏面的水消耗完畢,再打開水龍頭去接水。
stream默認緩存區大小爲16384(16kb),能夠經過highWaterMark參數設置緩存區大小,但設置encoding後,以設置的字符編碼爲單位衡量。web

3. Readable

首先建立一個可讀流,可接收5個參數:gulp

  • highWaterMark 緩存區字節大小,默認16384
  • encoding 字符編碼,默認爲null,就是buffer
  • objectMode 是否操做js其餘類型 默認false
  • read 對內部的_read()方式實現 子類實現,父類調用
  • destroy 對內部的_ destroy()方法實現 子類實現,父類調用

可讀流中分爲2種模式流動模式暫停模式
監聽data事件,觸發流動模式,會源源不斷生產數據觸發data事件:前端工程化

const { Readable } = require('stream');
	
	let i = 0;
		
	const rs = Readable({
	    encoding: 'utf8',
	    // 這裏傳入的read方法,會被寫入_read()
	    read: (size) => {
	        // size 爲highWaterMark大小
	        // 在這個方法裏面實現獲取數據,讀取到數據調用rs.push([data]),若是沒有數據了,push(null)結束流
	        if (i < 10) {
	            rs.push(`當前讀取數據: ${i++}`);
	        } else {
	            rs.push(null);
	        }
	    },
	    // 源代碼,可覆蓋
	    destroy(err, cb) {
	    	rs.push(null);
	    	cb(err);
	    }
	});
		
	rs.on('data', (data) => {
	    console.log(data);
	    // 每次push數據則觸發data事件
	    // 當前讀取數據: 0
	    // 當前讀取數據: 1
	    // 當前讀取數據: 2
	    // 當前讀取數據: 3
	    // 當前讀取數據: 4
	    // 當前讀取數據: 5
	    // 當前讀取數據: 6
	    // 當前讀取數據: 7
	    // 當前讀取數據: 8
	    // 當前讀取數據: 9
	})
複製代碼

監聽readable事件,觸發暫停模式,當流有了新數據或到了流結束以前觸發readable事件,須要顯示調用read([size])讀取數據:數組

const { Readable } = require('stream');
		
	let i = 0;
		
	const rs = Readable({
	    encoding: 'utf8',
	    highWaterMark: 9,
	    // 這裏傳入的read方法,會被寫入_read()
	    read: (size) => {
	        // size 爲highWaterMark大小
	        // 在這個方法裏面實現獲取數據,讀取到數據調用rs.push([data]),若是沒有數據了,push(null)結束流
	        if (i < 10) {
	          // push實際上是把數據放入緩存區
	          rs.push(`當前讀取數據: ${i++}`);
	        } else {
	            rs.push(null);
	        }
	    }
	});
	
	rs.on('readable', () => {
	    const data = rs.read(9);
	    console.log(data);
	    // 
	})
複製代碼

read([size]) size參數:緩存

  • 不傳表明讀取緩存區全部數據。
  • 傳入0 填充緩存區, 但返回null
  • size < 當前緩存區數據 返回所需數據
  • size > 當前緩存區數據 返回null 並改變highWaterMark值

這裏的緩存區數據不是指highWaterMark,獲取緩存區數據大小rs._readableState.length。

流的模式能夠自由切換: 經過rs._readableState.flowing的值獲取當前狀態

  • null 初始狀態
  • false 暫停模式
  • true 流動模式

rs.pause()切換到暫停模式 rs.resume()切換到流動模式

在可讀流裏面還能夠監聽其餘事件:

rs.on('close', () => {
		// 流關閉時或文件關閉時觸發
	})
	
	rs.on('end', () => {
		// 在流中沒有數據可供消費時觸發
	})
	
	rs.on('error', (err) => {
		// 發生錯誤時候
	})
複製代碼

4. Writable

可寫流可接受參數:

  • highWaterMark 緩存區字節大小,默認16384
  • decodeStrings 是否將字符編碼傳入緩衝區
  • objectMode 是否操做js其餘類型 默認false
  • write 子類實現,供父類調用 實現寫入底層數據
  • writev 子類實現,供父類調用 一次處理多個chunk寫入底層數據
  • destroy 能夠覆蓋父類方法,不能直接調用,銷燬流時,父類調用
  • final 完成寫入全部數據時父類觸發

在實現流除了用上面直接傳入參數的方式,還能夠用繼承類

class WS extends stream.Writable {
    constructor() {
        super({
            highWaterMark: 1
        });
    }

    _write(chunk, encoding, cb) {
        console.log(this._writableState.length);
        // chunk 爲須要寫入的數據
        // encoding 字符編碼
        // cb 回調函數, 若是寫入成功須要調用cb去執行下一次寫入,若是發生錯誤,能夠cb(new Error([錯誤信息]))
        if (chunk.length < 4) {
            fs.writeFileSync('./2.text', chunk, {
                flag: 'a'
            });
            cb();
        } else{
            cb(new Error('超出4個字節'));
        }
    }
}

const ws = new WS();

let i = 0;
function next() {
    let flag = true;

    // write() 會返回boolean false -> 緩存區沒滿 true —> 已滿,須要暫停寫入數據
    while(i < 10 && flag) {
        flag = ws.write(`${i++}`);
        console.log('flag', flag);
    }
}

next();

// 當全部緩存區數據已經成功寫入底層數據,緩存區沒有數據了,觸發drain事件
ws.on('drain', () => {
    console.log('drain');
    // 繼續寫入緩存區數據
    next();
})
複製代碼

可寫流的end事件,一旦觸發end事件,後續不能再寫入數據.

ws.write('start');
	ws.end('end');
	ws.wrtie('test'); // 報錯 write after end
複製代碼

finish事件:

ws.write('start');
	ws.end('end');
	ws.on('finish', () => {
		console.log('調用end方法後,而且全部數據已經寫入底層')
	})
複製代碼

cork()與uncork(),強制全部數據先寫入緩存區,直到調用uncork()或end(),這時一併寫入底層:

const ws = stream.Writable({
		writev(chunks, encoding, cb) {
		    // 這時chunks爲一個數組,包含全部的chunk
		
		    // 如今length爲10
		    console.log(chunk.length);
		}
	});
	
	// 寫入數據以前,強制寫入數據放入緩存區
	ws.cork();
	
	// 寫入數據
	for (let i = 0; i < 10; i++) {
	    ws.write(i.toString());
	}
	
	// 寫入完畢,能夠觸發寫入底層
	ws.uncork();
複製代碼

5. Duplex

讀寫流,該方法繼承了可寫流和可讀流,但相互之間沒有關係,各自獨立緩存區,擁有Writable和Readable全部方法和事件,同時實現_read()和_write()方法。

const fs = require('fs');
	const stream = require('stream');
	
	const duplex = stream.Duplex({
	    write(chunk, encoding, cb) {
	        console.log(chunk.toString('utf8')); // 寫入
	    },
	    read() {
	        this.push('讀取');
	        this.push(null);
	    }
	});
	
	console.log(duplex.read(6).toString('utf8')); // 讀取
	
	duplex.write('寫入');
複製代碼

6. Transform

轉換流,這個流在前端工程化中用到最多,從一個地方讀取數據,轉換數據後輸出到一個地方,該流繼承於Duplex。

const fs = require('fs');
	const stream = require('stream');
	
	const transform = stream.Transform({
	    transform(chunk, encoding, cb){
	        // 把數據轉換成大寫字母,而後push到緩存區
	        this.push(chunk.toString().toUpperCase());
	        cb();
	    }
	});
	
	transform.write('a');
	
	console.log(transform.read(1).toString()); // A
複製代碼

7. fs快速建立可讀/可寫流

可讀流和可寫流都須要咱們去實現父類的方法,那麼fs這個模塊幫咱們作了這件事情,fs裏面實現了高效而且可靠的可讀/可寫流,提供快速建立流,再也不去實現父類_write()或_read()。下面咱們來看看如何使用:

const fs = require('fs');
	
	/** * 建立可讀流 * * 第一個參數文件路徑 * * 第二個參數爲options * flags?: string; encoding?: string; 字符編碼 fd?: number; 文件打開後的標識符 mode?: number; 文件的權限 autoClose?: boolean; 讀取完畢後,是否自動關閉文件 start?: number; 從哪一個位置開始讀取 end?: number; 讀到何時結束 highWaterMark?: number; 最高水位線 */
	const rs = fs.createReadStream('1.text');
	
	rs.on('data', data => {
	    console.log(data);
	})
	
	/** * 建立可寫流 * * 第一個參數文件路徑 * * 第二個參數爲options * flags?: string; encoding?: string; 字符編碼 fd?: number; 文件打開後的標識符 mode?: number; 文件的權限 autoClose?: boolean; 寫入完畢後,是否自動關閉文件 start?: number; 從什麼位置開始寫入 */
	const ws = fs.createWriteStream('2.text');
	
	ws.write('123');
複製代碼

8. pipe

在流中搭建一條管道,從可讀流中到可寫流。

可讀流中有pipe()方法,在可寫流中能夠監聽pipe事件,下面實現了從可讀流中經過管道到可寫流:

const fs = require('fs');
	const stream = require('stream');
	
	const rs = stream.Readable({
	    read() {
	        this.push(fs.readFileSync('./1.text')); // 文件內容 test
	        this.push(null);
	    }
	});
	
	const ws = stream.Writable({
	    write(chunk, encoding, cb) {
	        // chunk爲test buffer
	        fs.writeFileSync('./2.text', chunk.toString());
	        cb();
	    }
	});
	
	ws.on('pipe', data => {
	    // 觸發pipe事件
	    console.log(data);
	});
	
	rs.pipe(ws);
複製代碼

9. 總結

流分爲四種基本類型,兩種模式。流中的數據不是直接寫入或讀取,有緩存區的概念。

相關文章
相關標籤/搜索