確認過眼神,你是喜歡Stream的人

摘要:在學習Node的過程當中,Stream流是經常使用的東東,在瞭解怎麼使用它的同時,咱們應該要深刻了解它的具體實現。今天的主要帶你們來寫一寫可讀流的具體實現,就過來,就過來,上碼啦!javascript

碼前準備

在寫代碼以前咱們首先要整理下思路,咱們要作什麼,以及怎麼來作。本篇文章以文件可讀流爲例,一個可讀流大致分爲四步:java

  1. 初始化參數
  2. 打開文件
  3. 讀取文件
  4. 結束,關閉文件node

    1、先來一波調用

  • 1.先引入一個readStream模塊
  • 2.實例化並傳入參數
var readStream=require('readStream.js');
var rs=new readStream('test.txt',{
    flags:'r',                  //打開文件的模式
    autoClose:true,             //結束是否自動關閉
    encoding:'utf8',            //字符編碼
    highWaterMark:3,            //每次讀取的字節數
    start:0,                    //從下標爲多少的位置開始讀取,默認以0開始
    end:3,                      //結束下標位置
});
  • 3.監聽data事件,接收讀到的值
    對於有讀取數據時,會觸發data事件,咱們在此先監聽data事件。關於事件的監聽和觸發,在node中用的是‘events’模塊,若是不太瞭解的盆友,能夠關注我哈,後續的文章會介紹到哦!本篇的重點是流,咱們就先直接用了。
rs.on('data',function(data){
    console.log(data);
})

2、接下來定義readStream這個模塊

1.由於咱們以文件的可讀流來作的,在此咱們要引入一個文件模塊。還有一個事件模塊,而且要繼承它,每個可讀流都是‘events’的一個實例。

首先咱們先初始化參數:異步

var fs=require('fs');
    var EventEmitter=require('events');
    class readStream extends EventEmitter{
        constructor(path,options){
            super();
            this.path=path;
            this.flags=opitons.flags||'r';
            this.autoClose=opitons.autoClose||true;
            this.encoding=options.encoding||null;
            this.highWaterMark=options.highWaterMark||64*1024;
            this.start=options.start||0;
            this.end=options.end;

            this.pos=this.start;
            this.buffer=Buffer.alloc(this.highWaterMark);
            this.flowing=null;
            this.open();
        }
    }

以上除了初始化傳遞進來的參數,還加了幾個pos,buffer,open(),flowing,爲何要加這些呢?這些值是來作什麼用的?咱們在此作出解答:函數

  • pos:是用在存儲每一次讀取文件時,讀取的位置。好比當highWater<end時,data有可能會觸發屢次,每次的位置應該是上次讀取位置的下一個,pos就是用來記錄這個位置的下標,因此初始值爲start。
  • buffer:分配一個長度爲this.highWaterMark的Buffer。
  • flowing:是指當前狀態是不是流動的,有三個值,初始爲null。當開始監聽data事件時,值爲true,則開始讀取文件。當值爲false時,暫停讀取文件。爲何剛剛我說data可能會屢次觸發,由於當flowing被設爲false時,data事件將中止觸發。想要改變flowing的值,node提供了兩個方法暫停pause()和恢復resume()。學習

    2.讀取一個文件應該先打開文件,咱們來定義該方法:

open(){
        fs.open(this.path,this.flags,(err,fd)=>{
            if(err){
                this.emit('err');
            }
            this.fd=fd;
            this.emit('open');
        });
    }

2.1在打開文件的時候,若是文件打開報錯,咱們除了要觸發錯誤事件外,還要注意一個參數。autoClose是指在文件讀取完畢或拋出錯誤後,本身關閉文件。
因而咱們根據這個參數值,在現有的open方法中對拋錯的狀況作出優化。優化

open(){
        fs.open(this.path,this.flags,(err,fd)=>{
            if(err){
                if(autoClose){
                    if(typeof this.fd === 'number'){
                        fs.close(this.fd,()=>{
                            this.emit('close');
                        });
                    }
                    this.emit('close');
                }
                this.emit('err');
            }
            this.fd=fd;
            this.emit('open');
        })
    }

3.打開文件後,並非立馬讀取,而是要檢查是否有data事件綁定監聽!

對此,咱們要在構造函數內檢查若是添加了data的事件監聽ui

class readStream extends EventEmitter{
        constructor(path,options){
            super();
            ...
            this.on('newListener',(eventName,callback)=>{
                if(eventName=='data'){
                    this.flowing=true;
                    this.read();
                }
            })
        }
    }

完成以上步驟後,咱們要作的就是讀取文件內容啦,下面來自定義一個read方法:this

  • 先判斷是否有讀取到內容,如有讀取到內容
  • --改變下次讀取的起點位置
  • --獲取到相同長度的Buffer空間
  • --若設了字符編碼,要將data做相應的轉換
  • --判斷此時this.pos的位置是否已超出告終束位置
  • --若是folwing爲true,則再次調用read方法
  • 讀取不到內容則拋出一個錯誤,並關閉文件
    代碼以下
read(){
        let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd,this.buffer,0,howToLength,this.pos,(err,bytesBase)=>{
            if(bytesBase>0){
                this.pos+=bytesBase;        
                this.buf=this.buffer.slice(0,bytesBase);   
                let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString(); 
                this.emit('data',data);
            
                if(this.end>this.pos){
                    this.emit('end');
                    if(autoClose){
                        if(typeof this.fd === 'number'){
                            fs.close(this.fd,()=>{
                                this.emit('close');
                            });
                        }
                        this.emit('close');
                    }
                }
                if(flowing){
                    this.read();
                }
            }else{
                this.emit('err');
                if(typeof this.fd === 'number'){
                    if(autoClose){
                        fs.close(this.fd,()=>{
                            this.emit('close');
                        });
                    }
                    this.emit('close');
                }
            }
        })
    }

到此,一個read方法就寫的差很少了,可是有個問題是要注意的,open方法是異步的,有可能出現調用read方法時,this.fd尚未值。爲了不這個錯誤,咱們改寫一下read方法。編碼

read(){
        if(typeof this.fd !== 'number'){
            this.once('open',()=>this.read());
        }
        ...
    }

這樣的話,一個基礎的readStream類纔算寫完整。咱們是否是要考慮下,有沒有什麼能夠優化的地方?細心的夥伴是否是發現有重複的代碼?
對,就是文件的關閉,咱們提出一個destory方法,用做關閉文件。

destory(){
        if(typeof this.fd==='number'){
            if(autoClose){
                fs.close(this.fd,()=>{
                    this.emit('close');
                });
                return ;
            }
            this.emit('close');
        }
    }

3、擴展

方法的調用介紹變量flowing時,咱們有提到'暫停'方法pause(),'重啓'方法resume()來改變flowing的值。咱們加入到代碼中。

  1. 首先加入調用,咱們在第一次讀取數據後暫停讀取,在3秒後繼續讀取。
rs.on('data',(data)=>{
        console.log(data);
        this.pause();
    });
    setTimeout(()=>{
        this.resume();
    },3000)
  1. 這兩個方法的調用也是同樣簡單:
pause(){
        this.flowing=false;
    }
    resume(){
        this.flowing=true;
        this.read();
    }

OK,大功告成了,下面整理出完整代碼

var fs=require('fs');
var EventEmitter=require('events');
class readStream extends EventEmitter{
    constructor(path,options){
        super();
        this.path=path;
        this.flages=options.flages||'r';
        this.autoClose=options.autoClose||true;
        this.encoding=options.encoding||null;
        this.highWaterMark=options.highWaterMark||64*1024;
        this.end=options.end;
        this.start=opitons.start||0;

        this.pos=this.start;
        this.flowing=false;
        this.buffer=Buffer.alloc(this.highWaterMark);
        this.open();

        this.on('newListener',(eventName,callback){
            if(eventName=='data'){
                this.flowing=true;
                fs.read();
            }
        });
        open(){
            fs.open(this.path,this.flags,(err,fd){
                if(err){
                    if(this.autoClose){
                        this.destory();
                    }
                    this.emit('err',err);
                    return ;
                }
                this.fd=fd;
                this.emit('open');

        });
        }
        destory(){
            if(typeof this.fd ='number'){
                fs.close(this.fd,()=>{
                    this.emit('close');
                });
                return ;
            }
            this.emit('close');
        }
        read(){
            if(typeof this.fd !== 'number'){
                return this.once('open',()=>this.read());
            }
            let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd,this.buffer,0,howToLenghth,this.pos,(err,bytesBase)=>{
                if(bytesBase>0){
                    this.pos+=bytesBase;
                    let buf=this.buffer.slice(0,bytesBase);
                    let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString();
                    this.emit('data',data);
                
                    if(this.pos>this.end){
                        this.emit('end');
                        this.destory();
                    }
                    if(flowing){
                        this.read()
                    }
                }else{
                    this.emit('err');
                    this.destory();
                }
            })   
        }
        pause(){
            this.flowing=false;
        }
        resume(){
            this.flowing=true;
            this.read();
        }
    }
}
相關文章
相關標籤/搜索