Node Stream

做者:qianniuer@毛豆前端前端

背景

從早先的unix開始,stream便開始進入了人們的視野,在過去的幾十年的時間裏,它被證實是一種可依賴的編程方式,它能夠將一個大型的系統拆成一些很小的部分,而且讓這些部分之間完美地進行合做。在unix中,咱們可使用|符號來實現流。在node中,node內置的stream模塊已經被多個核心模塊使用,同時也能夠被用戶自定義的模塊使用。和unix相似,node中的流模塊的基本操做符叫作.pipe(),同時你也可使用一個後壓機制來應對那些對數據消耗較慢的對象。node

爲何應該使用流?

在node中,I/O都是異步的,因此在和硬盤以及網絡的交互過程當中會涉及到傳遞迴調函數的過程。你以前可能會寫出這樣的代碼git

let http = require('http');
let fs = require('fs');

let server = http.createServer(function (req, res) {
	fs.readFile(__dirname + 'data.txt', function (err, data) {
	    res.end(data);
	});
});
server.listen(8000);
複製代碼

上面的這段代碼並無什麼問題,可是在每次請求時,咱們都會把整個data.txt文件讀入到內存中,而後再把結果返回給客戶端。想一想看,若是data.txt文件很是大,在響應大量用戶的併發請求時,程序可能會消耗大量的內存,這樣極可能會形成用戶鏈接緩慢的問題。github

其次,上面的代碼可能會形成很很差的用戶體驗,由於用戶在接收到任何的內容以前首先須要等待程序將文件內容徹底讀入到內存中。編程

所幸的是,(req,res)參數都是流對象,這意味着咱們可使用一種更好的方法來實現上面的需求:後端

let http = require('http');
let fs = require('fs');

let server = http.createServer(function (req, res) {
	let stream = fs.createReadStream(__dirname + '/data.txt');
	stream.pipe(res);
});
server.listen(8000);
複製代碼

在這裏,.pipe()方法會自動幫助咱們監聽data和end事件。上面的這段代碼不只簡潔,並且data.txt文件中每一小段數據都將源源不斷的發送到客戶端。api

除此以外,使用.pipe()方法還有別的好處,好比說它能夠自動控制後端壓力,以便在客戶端鏈接緩慢的時候node能夠將盡量少的緩存放到內存中。瀏覽器

想要將數據進行壓縮?咱們可使用相應的流模塊完成這項工做!緩存

let http = require('http');
let fs = require('fs');
let oppressor = require('oppressor');

let server = http.createServer(function (req, res) {
	let stream = fs.createReadStream(__dirname + '/data.txt');
	stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);
複製代碼

經過上面的代碼,咱們成功的將發送到瀏覽器端的數據進行了gzip壓縮。咱們只是使用了一個oppressor模塊來處理這件事情。bash

一旦你學會使用流api,你能夠將這些流模塊像搭樂高積木或者像鏈接水管同樣拼湊起來,今後之後你可能不再會去使用那些沒有流API的模塊獲取和推送數據了。

01 流的概念

  • 流是一組有序的,有起點和終點的字節數據傳輸手段
  • 可以更好的控制讀取和寫入的速度

02 開始使用

  1. 可讀流createReadStream
  • 流的兩種模式: 暫停模式
// 可讀流
let fs = require('fs');
// test.js   123456789
let rs = fs.createReadStream('./test.js', {
 	flags: 'r', // 讀取方式
 	encoding: null, // 編碼 默認 buffer
 	autoClose: true,
 	mode: 0666, // 權限 默認 0666
 	start: 0, // 開始讀取位置
 	end: 3,  // 結束位置 包後
 	highWaterMark: 2,  // 最高水位線
})
// 默認什麼都不幹 結果默認是不會讀取的
複製代碼
  • 流的兩種模式: 流動模式
rs.on('data', function (data) {
   console.log(data) // <Buffer 31 32> <Buffer 33 34> 分兩次輸出
})
rs.on('end', function() {
   console.log('讀取完畢')
})
複製代碼
  • 暫停和恢復 可控制讀取數據的速度
let arr = []
rs.on('data', function (data) {
   rs.pause() // 暫停 暫停觸發data事件
   arr.push(data);
})
rs.on('end', function() {
   console.log(Buffer.concat(arr).toString()); // 1234
})
setTimeout(() => {
   rs.resume(); // 恢復流
}, 1000)
複製代碼
// 錯誤監聽
rs.on('error', function (err) {
   console.log(err)
})
複製代碼
  1. 可寫流
  • 寫 (第一次會真的往文件裏寫)後面會寫到緩存中
// 可寫流
let fs = require('fs');
let ws = fs.createWriteStream('2.txt', {
        flags: 'w',
 	encoding: 'utf8',
 	autoClose: true,
 	start: 5,
 	highWaterMark: 3
});
複製代碼
  • 上面運行結果與highWaterMark有關, flag 表明 是否小於最高水位線
let flag = ws.write('1')
console.log(flag); // true
flag = ws.write('1')
console.log(flag); // true
flag = ws.write('1')
console.log(flag); // false
複製代碼

highWaterMark

highWaterMark只是一個標識,通常配合着讀取來用,例如: 預計用上述highWaterMark內存容量,假若有個文件 1g大小,每次讀取64k,讀取後超出最高水位線應該暫停一下 rs.pause(),若不暫停,會致使緩存過大

// 抽乾, 即緩存中文件已被所有寫入文件中(前提是,當前寫入內容 >= highWaterMark)
	ws.on('drain', function(){
 		console.log('已抽乾')
	})
	ws.end('結束') // 會將緩存區的內容清空後再關閉文件,且也會寫入此方法中的內容
	ws.write('ok') // ❌ 不能在 end 後繼續寫入
複製代碼

03 流的原理/手把手教你寫流的簡版源碼

  1. 可讀流 流的讀取是經過多個事件監聽來實現的,可讀流的經常使用事件:
let fs = require('fs');
let rs = fs.createReadStream('./2.txt', {
 	highWaterMark: 3,
 	autoClose: true,
 	flags: 'r',
 	start: 0,
 	end: 5,
 	// encoding: 'utf8', // 默認 buffer
})
rs.on('open', function() {
    console.log('open')
});
rs.on('error', function (err) {
    console.log(err);
})
rs.on('data', function(data) {
    console.log(data);
    rs.pause()
})
rs.on('end', function() {
    console.log('end')
})
rs.on('close', function(){
    console.log('close')
})
setInterval(() =>{
 	rs.resume();
}, 1000)
複製代碼
  1. 可讀流的實現
let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
    constructor(path, options) {
		super();
		this.path = path;
		this.autoClose = options.autoClose || true;
		this.flags = options.flags || 'r';
		this.encoding = options.encoding || null;
		this.start = options.start || 0;
		this.end = options.end || null;
		this.highWaterMark = options.highWaterMark || 64 * 1024;
		// 應該有一個讀取文件的位置 可變的(可變的位置)
		this.pos = this.start;
		// 控制當前是不是流動模式
		this.flowing = null;
		// 構建讀取到的內容的buffer
		this.buffer = Buffer.alloc(this.highWaterMark);
		// 但建立可讀流 要將文件打開
		this.open();
		this.on('newListener', (type) => {
			if (type === 'data') { 
			// 若是用戶監聽了data事件,就開始讀取
			this.flowing = true;
			this.read(); // 開始讀取文件
			}
		})
    }
    read() {
		// 這時候文件尚未打開,等文件打開後再去讀取
		if (typeof this.fd !== 'number') {
			// 等待文件打開,再次調用read方法
			return this.once('open', () => this.read())
		}
			// 開始讀取
		let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark;
		// 文件描述符 讀到哪一個buffer裏  讀取到buffer的哪一個位置 往buffer裏讀取幾個 讀取的位置
		// 想讀三個 可是文件只有2個
		// todo 外部調用resume方法的時候 需判斷若是end了 就不要讀了
		fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => {
			if (err) {
				this.emit('error')
				return
			}
			if (bytesRead > 0) { // 讀到內容了
				this.pos += bytesRead;
				// 保留有用的
				let r = this.buffer.slice(0, bytesRead);
				r = this.encoding ? r.toString(this.encoding) : r;
				this.emit('data', r);
				if (this.flowing) {
					this.read();
				}
			} else {
				this.emit('end');
				this.destroy();
			}
		});
    }
    destroy() {
		if (typeof this.fd === 'number') {
			fs.close(this.fd, () => {
				this.emit('close');
			});
			return
		}
		this.emit('close');
    }
    open() { // 打開文件的邏輯
		fs.open(this.path, this.flags, (err, fd) => {
			if (err) {
				this.emit('error', err);
				if (this.autoClose) {
					this.destroy() // 銷燬  關閉文件 (觸發close事件)
				}
				return
			}
			this.fd = fd;
			this.emit('open'); // 觸發文件開啓事件
		})
    }
    pause() {
        this.flowing = false;
    }
    resume() {
        this.flowing = true;
        this.read();
    }
}
module.exports = ReadStream;
複製代碼
  1. 可寫流
let fs = require('fs');
//  我想寫入時只佔用三個字節的內存
let ws = fs.createWriteStream('./1.txt', {
    flags: 'w',
    mode: 0o666,
    highWaterMark: 3,
    start: 0,
    autoClose: true,
    encoding: 'utf8',
})
let i = 0;
function write() {
    let flag = true;
    while(i < 9 && flag) {
        flag = ws.write(i++ + '')
    }
}
ws.on('drain', function () {
    console.log('寫入成功')
    write()
})
複製代碼
  1. 可寫流的實現
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.start = options.start || 0;
        this.autoClose = options.autoClose || true;
        this.encoding = options.encoding || 'utf8';
        // 是否須要觸發drain事件
        this.needDrain = false;
        // 是否正在寫入
        this.writing = false;
        // 緩存 正在寫入就放到緩存中
        this.buffer = [];
        // 算一個當前緩存的個數
        this.len = 0;
        // 寫入 的時候也有位置關係
        this.pos = this.start;
        this.open();
    }
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                this.emit('error');
                this.destroy();
                return
            }
            this.fd = fd;
            this.emit('open');
        })
    }
    destroy() {
        if (typeof this.fd === 'number') {
            fs.close(this.fd, () => {
                this.emit('close');
            });
            return
        }
        this.emit('close');
    }
    write(chunk, encoding = this.encoding, callback) {
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
        this.len += chunk.length;
        this.needDrain = this.highWaterMark <= this.len;
        if (this.writing) {
            this.buffer.push({chunk, encoding, callback})
        } else {
            // 當文件寫入溝 清空緩存區的內容
            this.writing = true; // 走緩存
            this._write(chunk, encoding, () => this.clearBuffer())
        }
        return !this.needDrain;
    }
    _write(chunk, encoding, callback) {
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this.write(chunk, encoding, callback));
        }
        // fd是文件描述符 chunk是數據  0 是寫入的位置  寫入的長度, this.pos 偏移量
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
            this.pos += bytesWritten;
            this.len -= bytesWritten; // 寫入的長度會減小
            callback();
        })
    }
    clearBuffer() {
        let buf = this.buffer.shift();
        if(buf) {
            this._write(buf.chunk, buf.encoding, () => this.clearBuffer());
        } else {
            this.writing = false;
            this.needDrain = false;
            this.emit('drain')
        }
    }
}
module.exports = WriteStream;
複製代碼
相關文章
相關標籤/搜索