摘要:在學習Node的過程當中,Stream流是經常使用的東東,在瞭解怎麼使用它的同時,咱們應該要深刻了解它的具體實現。今天的主要帶你們來寫一寫可讀流的具體實現,就過來,就過來,上碼啦!javascript
在寫代碼以前咱們首先要整理下思路,咱們要作什麼,以及怎麼來作。本篇文章以文件可讀流爲例,一個可讀流大致分爲四步:java
結束,關閉文件node
var readStream=require('readStream.js'); var rs=new readStream('test.txt',{ flags:'r', //打開文件的模式 autoClose:true, //結束是否自動關閉 encoding:'utf8', //字符編碼 highWaterMark:3, //每次讀取的字節數 start:0, //從下標爲多少的位置開始讀取,默認以0開始 end:3, //結束下標位置 });
rs.on('data',function(data){ console.log(data); })
首先咱們先初始化參數:異步
var fs=require('fs'); var EventEmitter=require('events'); class readStream extends EventEmitter{ constructor(path,options){ super(); this.path=path; this.flags=opitons.flags||'r'; this.autoClose=opitons.autoClose||true; this.encoding=options.encoding||null; this.highWaterMark=options.highWaterMark||64*1024; this.start=options.start||0; this.end=options.end; this.pos=this.start; this.buffer=Buffer.alloc(this.highWaterMark); this.flowing=null; this.open(); } }
以上除了初始化傳遞進來的參數,還加了幾個pos,buffer,open(),flowing,爲何要加這些呢?這些值是來作什麼用的?咱們在此作出解答:函數
flowing:是指當前狀態是不是流動的,有三個值,初始爲null。當開始監聽data事件時,值爲true,則開始讀取文件。當值爲false時,暫停讀取文件。爲何剛剛我說data可能會屢次觸發,由於當flowing被設爲false時,data事件將中止觸發。想要改變flowing的值,node提供了兩個方法暫停pause()和恢復resume()。學習
open(){ fs.open(this.path,this.flags,(err,fd)=>{ if(err){ this.emit('err'); } this.fd=fd; this.emit('open'); }); }
2.1在打開文件的時候,若是文件打開報錯,咱們除了要觸發錯誤事件外,還要注意一個參數。autoClose是指在文件讀取完畢或拋出錯誤後,本身關閉文件。
因而咱們根據這個參數值,在現有的open方法中對拋錯的狀況作出優化。優化
open(){ fs.open(this.path,this.flags,(err,fd)=>{ if(err){ if(autoClose){ if(typeof this.fd === 'number'){ fs.close(this.fd,()=>{ this.emit('close'); }); } this.emit('close'); } this.emit('err'); } this.fd=fd; this.emit('open'); }) }
對此,咱們要在構造函數內檢查若是添加了data的事件監聽ui
class readStream extends EventEmitter{ constructor(path,options){ super(); ... this.on('newListener',(eventName,callback)=>{ if(eventName=='data'){ this.flowing=true; this.read(); } }) } }
完成以上步驟後,咱們要作的就是讀取文件內容啦,下面來自定義一個read方法:this
read(){ let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark; fs.read(this.fd,this.buffer,0,howToLength,this.pos,(err,bytesBase)=>{ if(bytesBase>0){ this.pos+=bytesBase; this.buf=this.buffer.slice(0,bytesBase); let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString(); this.emit('data',data); if(this.end>this.pos){ this.emit('end'); if(autoClose){ if(typeof this.fd === 'number'){ fs.close(this.fd,()=>{ this.emit('close'); }); } this.emit('close'); } } if(flowing){ this.read(); } }else{ this.emit('err'); if(typeof this.fd === 'number'){ if(autoClose){ fs.close(this.fd,()=>{ this.emit('close'); }); } this.emit('close'); } } }) }
到此,一個read方法就寫的差很少了,可是有個問題是要注意的,open方法是異步的,有可能出現調用read方法時,this.fd尚未值。爲了不這個錯誤,咱們改寫一下read方法。編碼
read(){ if(typeof this.fd !== 'number'){ this.once('open',()=>this.read()); } ... }
這樣的話,一個基礎的readStream類纔算寫完整。咱們是否是要考慮下,有沒有什麼能夠優化的地方?細心的夥伴是否是發現有重複的代碼?
對,就是文件的關閉,咱們提出一個destory方法,用做關閉文件。
destory(){ if(typeof this.fd==='number'){ if(autoClose){ fs.close(this.fd,()=>{ this.emit('close'); }); return ; } this.emit('close'); } }
方法的調用介紹變量flowing時,咱們有提到'暫停'方法pause(),'重啓'方法resume()來改變flowing的值。咱們加入到代碼中。
rs.on('data',(data)=>{ console.log(data); this.pause(); }); setTimeout(()=>{ this.resume(); },3000)
pause(){ this.flowing=false; } resume(){ this.flowing=true; this.read(); }
OK,大功告成了,下面整理出完整代碼
var fs=require('fs'); var EventEmitter=require('events'); class readStream extends EventEmitter{ constructor(path,options){ super(); this.path=path; this.flages=options.flages||'r'; this.autoClose=options.autoClose||true; this.encoding=options.encoding||null; this.highWaterMark=options.highWaterMark||64*1024; this.end=options.end; this.start=opitons.start||0; this.pos=this.start; this.flowing=false; this.buffer=Buffer.alloc(this.highWaterMark); this.open(); this.on('newListener',(eventName,callback){ if(eventName=='data'){ this.flowing=true; fs.read(); } }); open(){ fs.open(this.path,this.flags,(err,fd){ if(err){ if(this.autoClose){ this.destory(); } this.emit('err',err); return ; } this.fd=fd; this.emit('open'); }); } destory(){ if(typeof this.fd ='number'){ fs.close(this.fd,()=>{ this.emit('close'); }); return ; } this.emit('close'); } read(){ if(typeof this.fd !== 'number'){ return this.once('open',()=>this.read()); } let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark; fs.read(this.fd,this.buffer,0,howToLenghth,this.pos,(err,bytesBase)=>{ if(bytesBase>0){ this.pos+=bytesBase; let buf=this.buffer.slice(0,bytesBase); let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString(); this.emit('data',data); if(this.pos>this.end){ this.emit('end'); this.destory(); } if(flowing){ this.read() } }else{ this.emit('err'); this.destory(); } }) } pause(){ this.flowing=false; } resume(){ this.flowing=true; this.read(); } } }