流(stream)在 Node.js 中是處理流數據的抽象接口(abstract interface)。 stream 模塊提供了基礎的 API 。使用這些 API 能夠很容易地來構建實現流接口的對象。api
Node.js 提供了多種流對象。 例如, HTTP 請求 和 process.stdout 就都是流的實例。緩存
流能夠是可讀的、可寫的,或是可讀寫的。全部的流都是 EventEmitter 的實例。異步
這裏咱們舉一個簡單的例子:函數
咱們打算讀取一個文件,使用 fs.readFileSync 同步讀取一個文件,程序會被阻塞,全部的數據都會被讀取到內存中。ui
換用 fs.readFile 讀取文件,程序不會被阻塞,可是全部的數據依舊會被一次性所有被讀取到內存中。this
當處理大文件壓縮、歸檔、媒體文件和巨大的日誌文件的時候,內存使用就成了問題,如今你們通常家用機內存大多數都是8G、16G,軟件包還在日益增大,在這種狀況下,流的優點就體現出來了。spa
流被設計爲異步的方式,在內存中只開啓一個固定的空間,將文件化整爲零,以流動的方式進行傳輸操做,解決了以上問題。設計
Node.js 中有四種基本的流類型:日誌
Readable - 可讀的流 (例如 fs.createReadStream()).code
Writable - 可寫的流 (例如 fs.createWriteStream()).
Duplex - 可讀寫的流 (例如 net.Socket).
Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).
可讀流有兩種模式:
一、流動模式(flowing):可讀流自動讀取數據,經過EventEmitter接口的事件儘快將數據提供給應用。
二、暫停模式(paused):必須顯式調用stream.read()方法來從流中讀取數據片斷。
能夠經過三種途徑切換到流動模式:
流動模式切換到暫停模式的api有:
可讀流事件:'data','readable','error','close','end'
咱們能夠想象下家用熱水器的模型,熱水器的水箱(buffer緩存區)裏面存着熱水(數據),在咱們用熱水的時候,開啓水龍頭,自來水會不斷的進入水箱,再從水箱由水龍頭流出來供咱們使用。這就是進入了「flowing」模式。當咱們關閉水龍頭時候,水箱則會暫停進水,水龍頭也會暫停出水,這是就進入了「paused」模式。
const fs = require('fs') const path = require('path') const rs = fs.createReadStream(path.join(__dirname, './1.txt')) rs.setEncoding('utf8') rs.on('data', (data) => { console.log(data) })
const fs = require('fs') const path = require('path') const rs = fs.createReadStream(path.join(__dirname, './1.txt')) rs.setEncoding('utf8') rs.on('readable', () => { let d = rs.read(1) console.log(d) })
咱們來實現一個簡單的流動模式下的可讀流介紹其原理,由NODEJS官方文檔可知,流繼承自EventEmitter模塊,而後咱們定義一些默認參數、緩存區、模式:
let EventEmitter = require('events'); let fs = require('fs'); class ReadStream extends EventEmitter { constructor(path,options) { super(); this.path = path; this.flags = options.flags || 'r'; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.buffer = Buffer.alloc(this.highWaterMark);//定義緩存區大小 this.pos = this.start; // pos 讀取的位置 可變 start不變的 this.flowing = null; // null就是暫停模式 } } module.exports = ReadStream;
接着在咱們須要定義一個打開文件的方法用於打開文件。還有一個一個destroy方法,用於在文件操做出錯或者讀完以後關閉文件。
open(){ fs.open(this.path,this.flags,(err,fd)=>{ if(err){ this.emit('error',err); if(this.autoClose){ // 是否自動關閉 this.destroy(); } return; } this.fd = fd; // 保存文件描述符 this.emit('open'); // 文件打開了 }); } destroy(){ // 先判斷有沒有fd 有關閉文件 觸發close事件 if(typeof this.fd ==='number'){ fs.close(this.fd,()=>{ this.emit('close'); }); return; } this.emit('close'); // 銷燬 }
接着要在構造函數中調用open方法,當用戶綁定data監聽時,修改可讀流的模式:
constructor(path,options){ super(); this.path = path; this.flags = options.flags || 'r'; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.flowing = null; this.buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; this.open();//打開文件 fd this.on('newListener',(eventName,callback)=>{ if(eventName === 'data'){ // 至關於用戶監聽了data事件 this.flowing = true; // 監聽了 就去讀 this.read(); // 去讀內容了 } }) }
接下來咱們實現最總要的read方法,首先要保證文件已經打開,接着鍍組文件進入緩存,觸發data事件傳入數據,若是處於流動模式,繼續讀取直到讀完文件。
read(){ // 此時文件還沒打開呢 if(typeof this.fd !== 'number'){ // 當文件真正打開的時候 會觸發open事件,觸發事件後再執行read,此時fd確定有了 return this.once('open',()=>this.read()) } // 此時有fd了 // 應該填highWaterMark? // 想讀4個 寫的是3 每次讀3個 // 123 4 let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark; fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{ // 讀到了多少個 累加 if(bytesRead>0){ this.pos+= bytesRead; let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead); this.emit('data',data); // 當讀取的位置 大於了末尾 就是讀取完畢了 if(this.pos > this.end){ this.emit('end'); this.destroy(); } if(this.flowing) { // 流動模式繼續觸發 this.read(); } }else{ this.emit('end'); this.destroy(); } }); }
剩下的pause和resume方法,很簡單
resume() { this.flowing = true; this.read(); } pause() { this.flowing = false; }
簡單的流實現完成了,看一下完整代碼
let EventEmitter = require('events'); let fs = require('fs'); class ReadStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.flags = options.flags || 'r'; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.open(); this.flowing = null; // null就是暫停模式 this.buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; this.on('newListener', (eventName,callback) => { if (eventName === 'data') { this.flowing = true; this.read(); } }) } read(){ if (typeof this.fd !== 'number') { return this.once('open', () => this.read()) } let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => { if (bytesRead > 0) { this.pos += bytesRead; let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead); this.emit('data', data); if(this.pos > this.end){ this.emit('end'); this.destroy(); } if(this.flowing) { this.read(); } }else{ this.emit('end'); this.destroy(); } }); } resume() { this.flowing = true; this.read(); } pause() { this.flowing = false; } destroy() { if(typeof this.fd === 'number') { fs.close(this.fd, () => { this.emit('close'); }); return; } this.emit('close'); }; open() { fs.open(this.path, this.flags, (err,fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); } return; } this.fd = fd; this.emit('open'); }); } } module.exports = ReadStream;
以上是流動模式的可讀流實現原理,暫停模式的可讀流原理與流動模式的主要區別在於監聽readable事件的綁定與read方法,先實現監聽綁定readable事件回調函數時,調用read方法讀取數據到緩存區,定義一個讀取方法_read
constructor(path, options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; this.start = 0; this.end = options.end; this.flags = options.flags || 'r'; this.buffers = []; // 緩存區 this.pos = this.start; this.length = 0; // 緩存區大小 this.emittedReadable = false; this.reading = false; // 不是正在讀取的 this.open(); this.on('newListener', (eventName) => { if (eventName === 'readable') { this.read(); } }) } read(n) { if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading) { this.reading = true; this._read(); } } } _read() { if (typeof this.fd !== 'number') { return this.once('open', () => this._read()); } let buffer = Buffer.alloc(this.highWaterMark); fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => { if (bytesRead > 0) { this.buffers.push(buffer.slice(0, bytesRead)); this.pos += bytesRead; this.length += bytesRead; this.reading = false; if (this.emittedReadable) { this.emittedReadable = false; this.emit('readable'); } } else { this.emit('end'); this.destroy(); } }) }
由api可知,暫停模式下的可讀流手動調用read方法參數能夠大於highWaterMark,爲了處理這種狀況,咱們先寫一個函數computeNewHighWaterMark,取到大於等於n的最小2的n次方的整數
function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; }
而後寫read方法,要考慮全n的各類狀況,上代碼
read(n) { if(n>this.length){ // 更改緩存區大小 讀取五個就找 2的幾回放最近的 this.highWaterMark = computeNewHighWaterMark(n) this.emittedReadable = true; this._read(); } // 若是n>0 去緩存區中取吧 let buffer=null; let index = 0; // 維護buffer的索引的 let flag = true; if (n > 0 && n <= this.length) { // 讀的內容 緩存區中有這麼多 // 在緩存區中取 [[2,3],[4,5,6]] buffer = Buffer.alloc(n); // 這是要返回的buffer let buf; while (flag&&(buf = this.buffers.shift())) { for (let i = 0; i < buf.length; i++) { buffer[index++] = buf[i]; if(index === n){ // 拷貝夠了 不須要拷貝了 flag = false; this.length -= n; let bufferArr = buf.slice(i+1); // 取出留下的部分 // 若是有剩下的內容 在放入到緩存中 if(bufferArr.length > 0){ this.buffers.unshift(bufferArr); } break; } } } } // 當前緩存區 小於highWaterMark時在去讀取 if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading){ this.reading = true; this._read(); // 異步的 } } return buffer }
附上可讀流暫停模式的完整實現原理代碼
let fs = require('fs'); let EventEmitter = require('events'); function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; } class ReadStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; this.start = 0; this.end = options.end; this.flags = options.flags || 'r'; this.buffers = []; // 緩存區 this.pos = this.start; this.length = 0; // 緩存區大小 this.emittedReadable = false; this.reading = false; // 不是正在讀取的 this.open(); this.on('newListener', (eventName) => { if (eventName === 'readable') { this.read(); } }) } read(n) { if(n>this.length){ // 更改緩存區大小 讀取五個就找 2的幾回放最近的 this.highWaterMark = computeNewHighWaterMark(n) this.emittedReadable = true; this._read(); } // 若是n>0 去緩存區中取吧 let buffer=null; let index = 0; // 維護buffer的索引的 let flag = true; if (n > 0 && n <= this.length) { // 讀的內容 緩存區中有這麼多 // 在緩存區中取 [[2,3],[4,5,6]] buffer = Buffer.alloc(n); // 這是要返回的buffer let buf; while (flag&&(buf = this.buffers.shift())) { for (let i = 0; i < buf.length; i++) { buffer[index++] = buf[i]; if(index === n){ // 拷貝夠了 不須要拷貝了 flag = false; this.length -= n; let bufferArr = buf.slice(i+1); // 取出留下的部分 // 若是有剩下的內容 在放入到緩存中 if(bufferArr.length > 0){ this.buffers.unshift(bufferArr); } break; } } } } // 當前緩存區 小於highWaterMark時在去讀取 if (this.length == 0) { this.emittedReadable = true; } if (this.length < this.highWaterMark) { if(!this.reading){ this.reading = true; this._read(); // 異步的 } } return buffer } // 封裝的讀取的方法 _read() { // 當文件打開後在去讀取 if (typeof this.fd !== 'number') { return this.once('open', () => this._read()); } // 上來我要喝水 先倒三升水 [] let buffer = Buffer.alloc(this.highWaterMark); fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => { if (bytesRead > 0) { // 默認讀取的內容放到緩存區中 this.buffers.push(buffer.slice(0, bytesRead)); this.pos += bytesRead; // 維護讀取的索引 this.length += bytesRead;// 維護緩存區的大小 this.reading = false; // 是否須要觸發readable事件 if (this.emittedReadable) { this.emittedReadable = false; // 下次默認不觸發 this.emit('readable'); } } else { this.emit('end'); this.destroy(); } }) } destroy() { if (typeof this.fd !== 'number') { return this.emit('close') } fs.close(this.fd, () => { this.emit('close') }) } open() { fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); } return } this.fd = fd; this.emit('open'); }); } } module.exports = ReadStream;