手寫node可讀流之流動模式

node的可讀流基於事件node

可讀流之流動模式,這種流動模式會有一個"開關",每次當"開關"開啓的時候,流動模式起做用,若是將這個"開關"設置成暫停的話,那麼,這個可讀流將不會去讀取文件,直到將這個"開關"從新置爲流動。緩存

讀取文件流程

讀取文件內容的流程,主要爲:bash

  1. 打開文件,打開文件成功,將觸發open事件,若是打開失敗,觸發error事件和close事件,將文件關閉。
  2. 開始讀取文件中的內容,監聽data事件,數據處於流動狀態,可經過修改開關的狀態來暫停讀取。
  3. 每次讀取到的內容放入緩存中,並經過data事件將數據發佈出去。
  4. 當文件中的內容讀取完畢以後,將文件關閉。

這一系列動做都是基於事件來進行操做的,而node中的事件咱們都知道是一種發佈訂閱模式來實現的。服務器

下面咱們來看一看,node是如何使用可讀流來讀取文件中的內容?異步

node 可讀流參數

首先咱們經過fs模塊來建立一個可讀流,可讀流接受兩個參數:函數

  • 第一個參數是要讀取的文件地址,在這裏指明你要讀取哪一個文件。
  • 第二個參數是可選項,這個參數是一個對象,用來指定可讀流的一些具體的參數。

以下幾個參數咱們來一一說明:ui

  • highWaterMark:設置高水位線,這個參數主要用於在讀取文件時,可讀流會將文件中的內容讀取到緩存當中,而這裏咱們須要建立一個buffer來緩存這些數據,因此這個參數是用來設置buffer的大小,若是不對這個參數進行設置的話,可讀流默認的配置64k。this

  • flags:這個參數主要用於設置文件的執行模式,好比說咱們具體的操做適用於讀取文件仍是寫入文件等這些操做。若是是寫入文件的話那咱們,使用的是w。若是是讀取文件的話那這個操做符就應該是r。編碼

下面這張表格就說明了不一樣的符號表明不一樣含義:spa

符號 含義
r 讀文件,文件不存在報錯
r+ 讀取並寫入,文件不存在報錯
rs 同步讀取文件並忽略緩存
w 寫入文件,不存在則建立,存在則清空
wx 排它寫入文件
w+ 讀取並寫入文件,不存在則建立,存在則清空
wx+ 和w+相似,排他方式打開
a 追加寫入
ax 與a相似,排他方式寫入
a+ 讀取並追加寫入,不存在則建立
ax+ 做用與a+相似,可是以排他方式打開文件
  • autoClose:這個參數主要用於,對文件的關閉的一些控制。若是文件再打開的過程或者其餘操做的過程當中出現了錯誤的狀況下,須要將文件進行關閉。那這個參數是設置文件是否自動關閉的功能。

  • encoding:node中用buffer來讀取文件操做的東西二進制數據。這些數據展示出來的話咱們是一堆亂碼,因此須要,要咱們對這個數據指定一個具體的編碼格式。而後將會對這些數據進行編碼轉化,這樣轉化出來的數據就是咱們能看懂的數據。

  • starts:這個參數主要用於指定從什麼位置開始讀取文件中的內容,默認的話是從零開始。

  • ends:這個參數主要用於指定定具體要讀取文件多長的數據,這裏須要說明一下,這個參數是包括自己的位置,也就是所謂的包前和包後。

下面咱們來看看可讀流具體例子:

let fs = require("fs");
let rs = fs.createReadStream("./a.js", {
    highWaterMark: 3,
    encoding: "utf8",
    autoClose: true,
    start: 0,
    end: 9
});
rs.on("open", () => {console.log("open");});
rs.on("close", () => {console.log("close");});
rs.on("data", data => {
    console.log(data);
    rs.pause();//暫停讀取 此時流動模式爲暫停模式
});
setInterval(() => {
    rs.resume();//從新設置爲流動模式,開始讀取數據
}, 1000);
rs.on("end", () => { console.log("end"); });
rs.on("error", err => { console.log(err); });
複製代碼

手寫可讀流第一步

上面咱們說過,node可讀流是基於node的核心模塊事件來完成的,因此在實現咱們本身的可讀流時須要繼承events模塊,代碼以下:

let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {

}
複製代碼

繼承了EventEmitter類,咱們就可使用EventEmitter類中的各個方法,而且一樣是採用發佈訂閱的模式了處理事件。

第二步:處理可讀流配置的參數

上面咱們提到,node中建立可讀流時能夠對這個流配置具體的參數,好比

let rs = fs.createReadStream("./a.js", {
    highWaterMark: 3,
    encoding: "utf8",
    autoClose: true,
    start: 0,
    end: 9
});
複製代碼

那麼對於這些參數,咱們本身實現的可讀流類也須要對這些參數進行處理,那麼這些參數該如何進行處理呢?

constructor(path, options = {}) {
    super();
    this.path = path; //指定要讀取的文件地址
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true; //是否自動關閉文件
    this.start = options.start || 0; //從文件哪一個位置開始讀取
    this.pos = this.start; // pos會隨着讀取的位置改變
    this.end = options.end || null; // null表示沒傳遞
    this.encoding = options.encoding || null;// buffer編碼
    this.flags = options.flags || 'r';

    this.flowing = null; // 模式開關
    this.buffer = Buffer.alloc(this.highWaterMark);// 根據設置建立一個buffer存儲讀出來的數
    this.open();
}
複製代碼

一般配置的原則是以用戶配置的參數爲準,若是用戶沒有對這個參數進行設置的話,就採用默認的配置。

實現可讀流第三步:打開文件

這裏原理是使用node模塊fs中的open方法。首先咱們來回顧下fs.open()方法的使用。

fs.open(filename,flags,[mode],callback);
//實例
fs.open('./1,txt','r',function(err,fd){});
複製代碼

這裏須要說明下,回調函數callback中有2個參數:

  • 第一個是error,node中異步回調都會返回的一個參數,用來講明具體的錯誤信息
  • 第二個參數是fd,是文件描述符,用來標識文件,等價於open函數的第一個參數

好了,如今咱們來看看咱們本身的可讀流的open方法該如何實現吧:

open() {
    fs.open(this.path, this.flags, (err, fd) => { 
        //fd標識的就是當前this.path這個文件,從3開始(number類型)
        if (err) {
            if (this.autoClose) { // 若是須要自動關閉則去關閉文件
                this.destroy(); // 銷燬(關閉文件,觸發關閉事件)
            }
            this.emit('error', err); // 若是有錯誤觸發error事件
            return;
        }
        this.fd = fd; // 保存文件描述符
        this.emit('open', this.fd); // 觸發文件的打開的方法
    });
}
複製代碼

從代碼上咱們能夠看出:

  • fs.open函數是異步函數,也就是說callback是異步執行的,在成功打開文件的狀況下,fd這個屬性也是異步獲取到的,這點須要注意。

  • 另外重要的一點是,若是在打開文件發生錯誤時,則代表打開文件失敗,那麼此時就須要將文件關閉。

實現可讀流第四步:讀取文件內容

上面咱們詳細說過,可讀流自身定義了一個"開關",當咱們要讀取文件中的內容的時候,咱們須要將這個"開關"打開,那麼node可讀流自己是如何來打開這個"開關"的呢?

監聽data事件

node可讀流經過監聽data事件來實現這個"開關"的開啓:

rs.on("data", data => {
    console.log(data);
});
複製代碼

當用戶監聽data事件的時候,"開關"開啓,不停的從文件中讀取內容。那麼node是怎麼監聽data事件的呢? 答案就是 事件模塊的newListener

這是由於node可讀流是基於事件的,而事件中,服務器就能夠經過newListener事件監聽到從用戶這邊過來的全部事件,每一個事件都有對應的類型,當用戶監聽的是data事件的時候,咱們就能夠獲取到,而後就能夠去讀取文件中的內容了,那咱們本身的可讀流該如何實現呢?

// 監聽newListener事件,看是否監聽了data事件,若是監聽了data事件的話,就開始啓動流動模式,讀取文件中的內容
this.on("newListener", type => {
    if (type === "data") {
        //  開啓流動模式,開始讀取文件中的內容
        this.flowing = true;
        this.read();
    }
});
複製代碼

好了,知道了這個"開關"是如何打開的,那麼這個時候就到了真正讀取文件中內容的關鍵時候了,先上代碼先:

read() {
    // 第一次讀取文件的話,有可能文件是尚未打開的,此時this.fd可能尚未值
    if (typeof this.fd !== "number") {
        // 若是此時文件仍是沒有打開的話,就觸發一次open事件,這樣文件就真的打開了,而後再讀取
        return this.once("open", () => this.read());
    }
    // 具體每次讀取多少個字符,須要進行計算,由於最後一次讀取倒的可能比highWaterMark小
    let howMuchRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
    fs.read(this.fd, this.buffer, 0, howMuchRead, this.pos, (err, byteRead) => {
        // this.pos 是每次讀取文件讀取的位置,是一個偏移量,每次讀取會發生變化
        this.pos += byteRead;
        // 將讀取到的內容轉換成字符串串,而後經過data事件,將內容發佈出去
        let srr = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);
        // 將內容經過data事件發佈出去
        this.emit("data", srr);
        // 當讀取到到內容長度和設置的highWaterMark一致的話,而且仍是流動模式的話,就繼續讀取
        if ((byteRead === this.highWaterMark) && this.flowing) {
            return this.read();
        }
        // 沒有更多的內容了,此時表示文件中的內容已經讀取完畢
        if (byteRead < this.highWaterMark) {
            // 讀取完成,發佈end方法,並關閉文件
            this.emit("end");
            this.destory();
        }
    });
}
複製代碼

這裏咱們特別要注意的是:

  • 文件是否已經打開,是否獲取到fd,若是沒有打開的話,則再次觸發open方法
  • 分批次讀取文件內容,每次讀取的內容是變化的,因此位置和偏移量是要動態計算的
  • 控制讀取中止的條件。

實現可讀流第五步:關閉文件

好了,到如今,基礎的讀取工做已經完成,那麼就須要將文件關閉了,上面的open和read方法裏面都調用了一個方法:destory,沒錯,這個就是關閉文件的方法,好了,那麼咱們來看看這個方法該如何實現吧

destory() {
    if (typeof this.fd !== "number") {
        // 發佈close事件
        return this.emit("close");
    }
    // 將文件關閉,發佈close事件
    fs.close(this.fd, () => {
        this.emit("close");
    });
}
複製代碼

固然這塊的原理就是調用fs模塊的close方法啦。

實現可讀流第六步:暫停和恢復

既然都說了,node可讀流有一個神奇的"開關",就像大壩的閥門同樣,能夠控制水的流動,一樣也能夠控制水的暫停啦。固然在node可讀流中的暫停是中止對文件的讀取,恢復就是將開關打開,繼續讀取文件內容,那麼這兩個分別對應的方法就是pause()和resume()方法。

那麼咱們本身的可讀流類裏面該如何實現這兩個方法的功能呢?很是簡單: 咱們在定義類的私有屬性的時候,定義了這樣一個屬性flowing,當它的值爲true時表示開關打開,反之關閉。

pause() {
    this.flowing = false;// 將流動模式設置成暫停模式,不會讀取文件
}
resume() {
    this.flowing = true;//將模式設置成流動模式,能夠讀取文件
    this.read();// 從新開始讀取文件
}
複製代碼

好了,關於node可讀流的實現咱們就寫到這裏,快快敲起代碼,動手實現一個你本身的可讀流吧!

相關文章
相關標籤/搜索