淺析node中流應用(一) 可讀流(fs.createReadStream)

爲何要須要流?

  • 當咱們學習新知識的時候,首先咱們知道爲何要學習,那咱們爲何要學習流?由於在在node中讀取文件的方式有來兩種,一個是利用fs模塊,一個是利用流來讀取。若是讀取小文件,咱們可使用fs讀取,fs讀取文件的時候,是將文件一次性讀取到本地內存。而若是讀取一個大文件,一次性讀取會佔用大量內存,效率很低,這個時候須要用流來讀取。流是將數據分割段,一段一段的讀取,能夠控制速率,效率很高,不會佔用太大的內存。gulp的task任務,文件壓縮,和http中的請求和響應等功能的實現都是基於流來實現的。所以,系統學習下流仍是頗有必要的

可讀流用法(先把用法學會)

  • node中讀是將內容讀取到內存中,而內存就是Buffer對象
  • 流都是基於原生的fs操做文件的方法來實現的,經過fs建立流。全部的 Stream 對象都是 EventEmitter 的實例。經常使用的事件有:
  • open -打開文件
  • data -當有數據可讀時觸發。
  • error -在讀收和寫入過程當中發生錯誤時觸發。
  • close -關閉文件
  • end - 沒有更多的數據可讀時觸發

建立可讀流

  • 統一下 1.txt中的內容 1234567890
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3, //文件一次讀多少字節,默認 64*1024
    flags:'r', //默認 'r'
    autoClose:true, //默認讀取完畢後自動關閉
    start:0, //讀取文件開始位置
    end:3, //流是閉合區間 包含start也含end
    encoding:'utf8' //默認null
});
複製代碼
  • 注意: 默認建立一個流 是非流動模式,默認不會讀取數據
  • 具體參數說明,咱們能夠參考下node官網詳細介紹
    http://nodejs.cn/api/fs.html#fs_fs_createreadstream_path_options

監聽open事件

rs.on("open",()=>{
   console.log("文件打開")
});
複製代碼

監聽data事件

  • 可讀流這種模式它默認狀況下是非流動模式(暫停模式),它什麼也不作,就在這等着html

  • 監聽了data事件的話,就能夠將非流動模式轉換爲流動模式node

  • 流動模式會瘋狂的觸發data事件,直到讀取完畢git

  • 直接上代碼github

//1.txt中內容爲1234567890
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3, //文件一次讀多少字節,默認 64*1024
    flags:'r', //默認 'r'
    autoClose:true, //默認讀取完畢後自動關閉
    start:0, //讀取文件開始位置
    end:3, //流是閉合區間 包含start也含end
    encoding:'utf8' //默認null
});
rs.on("open",()=>{
   console.log("文件打開")
});
//瘋狂觸發data事件 直到讀取完畢
rs.on('data',(data)=>{
    console.log(data); //共讀4個字節,可是highWaterMark爲3,因此觸發2次data事件,分別打印123  4
});
複製代碼

監聽err/end/close事件

rs.on("err",()=>{
    console.log("發生錯誤")
});
rs.on('end',()=>{ //文件讀取完畢後觸發
    console.log("讀取完畢");
});
rs.on("close",()=>{ //最後文件關閉觸發
    console.log("關閉")
});
複製代碼

不要急,最後把方法介紹完統一寫個例子,你們一看便一目了之算法

最後介紹兩個方法就大功告成啦

  • rs.pause() 暫停讀取,會暫停data事件的觸發,將流動模式轉變非流動模式
  • rs.resume()恢復data事件,繼續讀取,變爲流動模式

終於把可讀流的全部API講完了,火燒眉毛的寫個完整的案例來體驗下,說幹就幹gulp

手寫可讀流

1、準備工做,構建可讀流構造函數

  • 記住Stream 對象都是 EventEmitter 的實例,內部是經過發佈訂閱模式實現的。直接貼代碼
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter { //建立可讀流類,繼承 EventEmitter
    constructor(path, options = {}) { //options默認空對象
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = options.start || 0;
        this.pos = this.start; //pos會隨着讀取的位置改變
        this.end = options.end || null;
        this.encoding = options.encoding || null;
        this.flags = options.flags || 'r';
        this.flowing = null; //非流動模式
        //聲明一個buffer表示都出來的數據
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.open(); //打開文件 fd
    }
複製代碼
  • 其實只是賦值了不少默認值,沒有什麼難點,接下來就要寫this.open()方法,即打開文件

2、在ReadStream原型中寫open方法

  • 廢話很少說,直接上代碼,代碼中有詳細的代碼解釋
//打開文件用
    open() {
        fs.open(this.path, this.flags, (err, fd) => { //fd標識的就是當前this.path這個文件,從3開始(number類型)
            if (err) { 
                if (this.autoClose) { //若是須要自動關閉我再去銷燬fd
                    this.destroy(); //關閉文件(觸發關閉事件)
                }
                this.emit('error', err); //打開文件發生錯誤,發佈error事件
            }
            this.fd = fd; //保存文件描述符
            this.emit('open', this.fd) //觸發文件open方法
        })
    }
複製代碼
  • 想下,打開文件咱們作了兩件事,
  • 一、若是發生錯誤,關閉文件,同時發射 "error"事件
  • 二、若是沒有錯誤,保存fd,而後發射 "open"事件
  • 先來實現下this.destroy()關閉文件的方法

3、實現destroy()方法

destroy() {
        if (typeof this.fd != 'number') { //文件未打開,也要關閉文件且觸發close事件
            return this.emit('close');
        }
        fs.close(this.fd, () => {  //若是文件打開過了 那就關閉文件而且觸發close事件
            this.emit("close");
        })
    }
複製代碼
  • 這樣一來,rs.on('open')已經實現了,咱們來測試下吧

4、實現主要的read方法真的讀文件,於rs.on('data')方法對應

  • 一、確保真的拿到fd(文件描述符,默認3,number類型)
  • 二、確保拿到fd後,對fs.read中howMuchToRead有一個繞的算法,多舉幾個例子理解更好,若是對fs.read不瞭解,戳這裏,fs.read()方法介紹
  • 三、異步遞歸去讀文件,讀完爲止。
  • 四、說了這麼多,直接幹。
read() {
        //此時文件還沒打開
        if (typeof this.fd != 'number') {
            //當文件真正打開的時候 會觸發open事件,觸發事件後再執行read,此時fd 就有了
            return this.once('open', () => this.read())
        }
        //此時有fd了 開始讀取文件了
        //this.pos是變量,開始時this.pos = this.start,在上面定義過了
        //算法有點繞,源碼中是這樣實現的。舉個例子 end=3,pos=0,highWaterMark=3, howMuchToRead = 3, 1.txt內容1234 就會讀123  4 
        let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, byteRead) => {
            // byteRead真實讀到的個數
            this.pos += byteRead;
            // this.buffer默認三個 
            let b = this.buffer.slice(0, byteRead);
            //對讀到的b進行編碼
            b = this.encoding ? b.toString(this.encoding) : b;
            //把讀取到的buffer發射出去
            this.emit('data', b);
            if ((byteRead === this.highWaterMark) && this.flowing) {
                return this.read();
            }
            //這裏沒有更多邏輯了
            if (byteRead < this.highWaterMark) {
                //沒有更多了
                this.emit('end'); //讀取完畢
                this.destroy();   //銷燬完畢
            }
        })
    }
複製代碼

你們會發現,此時咱們尚未監聽 rs.on('data')事件,來觸發read方法,此時咱們須要修改下 第一步建立構造函數的代碼api

constructor(path, options = {}) {
        //省略.... 代碼和第一步同樣,下面是新添加
    

       // 看是否監聽了data事件,若是監聽了就要變成流動模式
        this.on('newListener', (eventName, callback) => {
            if (eventName === 'data') {
                //至關於用戶監聽了data事件
                this.flowing = true;
                // 監聽了就去讀
                this.read(); //去讀內容
            }
        })
    }

複製代碼

若是能看到這裏,就基本大功告成,就只剩下pause和resume 暫停和恢復暫停方法。那就一寫到底bash

5、添加pause暫停 和resume恢復暫停方法

  • 兩個方法很是簡單,就直接貼代碼
pause() {
        this.flowing = false;
    }
    resume() {
        this.flowing = true;
        //恢復暫停,在去無限讀
        this.read();
    }
複製代碼

終於大功告成,寫的對不對呢,趕忙測試下吧,期待的搓手手異步

end

  • 咱們已經實現了可讀流實現,後續還會有可寫流實現。api雖然枯燥,但願你們仍是多寫寫源碼
  • 對源碼感興趣,我把源碼放在github上 ,供你們參考
相關文章
相關標籤/搜索