NodeJS —— 流的原理分析與簡易實現

在這裏插入圖片描述


閱讀原文


前言

在以前的博客中已經瞭解了流的基本用法,這篇的重點在於根據可讀流的用法對可讀流的原理進行分析,實現簡易版的 ReadStream,流的基本用法請看這裏 NodeJS —— Stream 的基本使用編程


可讀流的實現(流動模式)

一、ReadStream 類建立

在使用 fscreateReadStream 建立可讀流時,返回了 ReadStream 對象,上面存在着一些事件和方法,其實咱們在建立這個可讀流的時候建立了某一個類的實例,這個實例能夠調用類原型上的方法,咱們這裏將這個類命名爲 ReadStream數組

在類原型上的方法內部可能會建立一些事件,在 NodeJS 中,事件是依賴 events 模塊的,即 EventEmitter 類,同時類的方法可能會操做文件,會用到 fs 模塊,因此也提早引入 fs緩存

// 建立 ReadStream 類
// 引入依賴模塊
const EventEmitter = require("events");
const fs = require("fs");

// 建立 ReadStream 類
class ReadStream extends EventEmitter {
    constructor(path, options = {}) {
        super();
        // 建立可讀流參數傳入的屬性
        this.path = path; // 讀取文件的路徑
        this.flags = options.flags || "r"; // 文件標識位
        this.encoding = options.encoding || null; // 字符編碼
        this.fd = options.fd || null; // 文件描述符
        this.mode = options.mode || 0o666; // 權限位
        this.autoClose = options.autoClose || true; // 是否自動關閉
        this.start = options.start || 0; // 讀取文件的起始位置
        this.end = options.end || null; // 讀取文件的結束位置(包含)
        this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次讀取文件的字節數

        this.flowing = false; // 控制當前是不是流動狀態,默認爲暫停狀態
        this.buffer = Buffer.alloc(this.highWaterMark); // 存儲讀取內容的 Buffer
        this.pos = this.start; // 下次讀取文件的位置(變化的)

        // 建立可讀流要打開文件
        this.open();

        // 若是監聽了 data 事件,切換爲流動狀態
        this.on("newListener", type => {
            if (type === "data") {
                this.flowing = true;

                // 開始讀取文件
                this.read();
            }
        });
    }
}

// 導出模塊
module.exports = ReadStream;

使用 fs.createReadStream 時傳入了兩個參數,讀取文件的路徑和一個 options 選項,options 上有八個參數,咱們在建立 ReadStream 類的時候將這些參數初始化到了 this 上。異步

建立可讀流的時候有兩種狀態,流動狀態和暫停狀態,默認建立可讀流是暫停狀態,只有在觸發 data 事件時纔會變爲流動狀態,因此在 this 上掛載了 flowing 存儲當前的狀態是否爲流動狀態,值默認爲 false函數

注意:這裏說的暫停狀態不是暫停模式,暫停模式是 readable, 是可讀流的另外一種模式,咱們這節討論的可讀流爲流動模式。性能

在讀取文件時實際上是操做 Buffer 進行讀取的,須要有一個 Buffer 實例用來存儲每次讀取的數據,因此在 this 上掛載了一個新建立的 Buffer,長度等於 highWaterMark測試

當從 start 值的位置開始讀取文件,下一次讀取文件的位置會發生變化,因此在 this 上掛載了 pos 屬性,用於存儲下次讀取文件的位置。ui

在建立 ReadStream 的實例(可讀流)時,應該打開文件並進行其餘操做,因此在 this 上掛載了 open 方法並執行。this

建立實例的同時監聽了 newListener 事件,回調在每次使用 on 監聽事件時觸發,回調內部邏輯是爲了將默認的暫停狀態切換爲流動狀態,由於在使用時,流動狀態是經過監聽 data 事件觸發的,在 newListener 的回調中判斷事件類型爲 data 的時候將 flowing 標識的值更改成 true,並調用讀取文件的 read 方法。編碼

在使用 ES6 的類編程時,原型上的方法都是寫在 class 內部,咱們下面爲了把原型上的方法拆分出來成爲單獨的代碼塊,都使用 ReadStream.prototype.open = function... 直接給原型添加屬性的方式,但這樣的方式和直接寫在 class 內有一點區別,就是 class 內部的書寫的原型方法都是不可遍歷的,添加屬性的方式建立的方法都是可遍歷的,可是這點區別對咱們代碼的執行沒有任何影響。

二、打開文件方法 open 的實現

在使用可讀流時,打開時默認是暫停狀態,會觸發 open 事件,若是打開文件出錯會觸發 error 事件。

// open 方法
// 打開文件
ReadStream.prototype.open = function() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err) {
            this.emit("error", err);

            // 若是文件打開了出錯,並配置自動關閉,則關掉文件
            if (this.autoClose) {
                // 關閉文件(觸發 close 事件)
                this.destroy();

                // 再也不繼續執行
                return;
            }
        }
        // 存儲文件描述符
        this.fd = fd;

        // 成功打開文件後觸發 open 事件
        this.emit("open");
    });
};

open 方法的邏輯就是在打開文件的時候,將文件描述符存儲在實例上方便後面使用,並使用 EventEmitter 的原型方法 emit 觸發 open 事件,若是出錯就使用 emit 觸發 error 事件,若是配置 autoClose 參數爲 true,就關閉文件並觸發 close

咱們將關閉文件的邏輯抽取出來封裝在了 ReadStream 類的 destroy 方法中,下面來實現 destroy

三、關閉文件方法 destroy 的實現

文件出錯分爲兩種,第一種文件打開出錯,第二種是文件不存在出錯(沒打開),第二種系統是沒有分配文件描述符的。

// detroy 方法
// 關閉文件
ReadStream.prototype.detroy = function() {
    // 判斷是否存在文件描述符
    if (typeof this.fd === "number") {
        // 存在則關閉文件並觸發 close 事件
        fs.close(fd, () => {
            this.emit("close");
        });
        return;
    }

    // 不存在文件描述符直接觸發 close 事件
    this.emit("close");
};

若是是打開文件後出錯須要關閉文件,並觸發 close 事件,若是是沒打開文件,則直接觸發 close 事件,因此上面經過文件描述符來判斷該如何處理。

四、讀取文件方法 read 的實現

還記得在 ReadStream 類中,監聽的 newListener 事件的回調中若是監聽了 data 事件則會執行 read 讀取文件,接下來就實現讀取文件的核心邏輯。

// read 方法
// 讀取文件
ReadStream.prototype.read = function() {
    // 因爲 open 異步執行,read 是在建立實例時同步執行
    // read 執行可能早於 open,此時不存在文件描述符
    if (typeof this.fd !== "number") {
        // 由於 open 用 emit 觸發了 open 事件,因此在這是從新執行 read
        return this.once("open", () => this.read());
    }

    // 如過設置告終束位置,讀到結束爲止就不能再讀了
    // 若是最後一次讀取真實讀取數應該小於 highWaterMark
    // 因此每次讀取的字節數應該和 highWaterMark 取最小值
    let howMuchToRead = this.end
        ? Math.min(this.highWaterMark, this.end - this.pos + 1)
        : this.highWaterMark;

    // 讀取文件
    fs.read(
        this.fd,
        this.buffer,
        0,
        howMuchToRead,
        this.pos,
        (err, bytesRead) => {
            // 若是讀到內容執行下面代碼,讀不到則觸發 end 事件並關閉文件
            if (bytesRead > 0) {
                // 維護下次讀取文件位置
                this.pos += bytesRead;

                // 保留有效的 Buffer
                let realBuf = this.buffer.slice(0, bytesRead);

                // 根據編碼處理 data 回調返回的數據
                realBuf = this.encoding
                    ? realBuf.toString(this.encoding)
                    : realBuf;

                // 觸發 data 事件並傳遞數據
                this.emit("data", realBuf);

                // 遞歸讀取
                if (this.flowing) {
                    this.read();
                }
            } else {
                this.isEnd = true;
                this.emit("end"); // 觸發 end 事件
                this.detroy(); // 關閉文件
            }
        }
    );
};

建立 ReadStream 的實例時,執行的 open 方法內部是使用 fs.open 打開文件的,是異步操做,而讀取文件方法 read 是在 newListener 回調中同步執行的,這樣極可能觸發 read 的時候文件尚未被打開(不存在文件描述符),因此在 read 方法中判斷了文件描述符是否存在,並在不存在時候使用 once 添加了 open 事件,回調中從新執行了 read

因爲在 open 方法中使用 emit 觸發了 open 事件,因此 read 內用 once 添加的 open 事件的回調也會跟着執行一次,並在回調中從新調用了 read 方法,保證了 read 讀取文件的邏輯在文件真正打開後才執行,爲了文件打開前執行 read 而不執行讀取文件的邏輯,用 once 添加 open 事件時別忘記 return

在使用 fs.read 讀取文件的時候有一個參數爲本次讀取幾個字符到 Buffer 中,若是在建立可讀流的時候設置了讀取文件的結束位置 end 參數,則讀到 end 位置就不該該再繼續讀取了,因此在存在 end 參數的時候每次都計算一下讀取個數和 highWaterMark 取最小值,保證讀取內容小於 highWaterMark 的時候不會多讀,由於讀取時是包括 end 值做爲 Buffer 的索引這一項的,因此計算時多減去的要 +1 加回來,再一次讀取這個讀取個數計算結果變成了 0,也就結束了讀取。

由於 end 參數的狀況,因此在內部讀取邏輯前判斷了 bytesRead (實際讀取字節數)是否大於 0,若是不知足條件則在實例添加是否讀取結束標識 isEnd(後面使用),觸發 end 事件並關閉文件,若是知足條件,也是經過 bytesRead 對 Buffer 進行截取,保留了有用的 Buffer,而且經過 encoding 編碼對 Buffer 進行處理後,觸發 data 事件,並將處理後的數據傳遞給 data 事件的回調。

五、暫停、恢復讀取 pause 和 resume

pause 的目的就是暫停讀取,其實就是阻止 read 方法在讀取時進行遞歸,因此只須要更改 flowing 的值便可。

// pause 方法
// 暫停讀取
ReadStream.prototype.pause = function() {
    this.flowing = false;
};

resume 的目的是恢復讀取,在更改 flowing 值得基礎上從新執行 read 方法,因爲在 pause 調用時 read 內部仍是執行得讀取文件得分支,文件並無關閉,讀取文件位置的參數也是經過實例上的當前的屬性值進行計算的,因此從新執行 read 會繼續上一次的位置讀取。

// resume 方法
// 恢復讀取
ReadStream.prototype.resume = function() {
    this.flowing = true;
    if (!this.isEnd) this.read();
};

上面在從新執行 read 以前使用 isEnd 標識作了判斷,防止在 setInterval 中調用 resume 在讀取完成後不斷的觸發 endclose 事件。


驗證可讀流(流動模式)ReadStream

接下來咱們使用本身實現的 ReadStream 類來建立可讀流,並按照 fs.createReadStream 的用法進行使用並驗證。

// 驗證 ReadStream
// 文件 1.txt 內容爲 0123456789
const fs = require("fs");
const ReadStream = require("./ReadStream");

// 建立可讀流
let rs = new ReadStream("1.txt", {
    encoding: "utf8",
    start: 0,
    end: 5,
    highWaterMark: 2
});

rs.on("open", () => console.log("open"));

rs.on("data", data => {
    console.log(data, new Date());
    rs.pause();
});

rs.on("end", () => console.log("end"));
rs.on("close", () => console.log("close"));
rs.on("error", err => console.log(err));

setInterval(() => rs.resume(), 1000);

// open
// 01 2018-07-04T10:44:20.384Z
// 23 2018-07-04T10:44:21.384Z
// 45 2018-07-04T10:44:22.384Z
// end
// close

執行上面的代碼正常的執行邏輯是先觸發 open 事件,而後觸發 data 事件,讀取一次後暫停,每隔一秒恢復讀取一次,再讀取完成後觸發 endclose 事件,經過運行代碼結果和咱們但願的同樣。


可讀流的實現(暫停模式)

一、在 fs 中的暫停模式的真正用法

fs 模塊中用 createReadStream 建立的可讀流中經過監聽 readable 事件觸發暫停模式(監聽 data 事件觸發流動模式),經過下面例子感覺暫停模式與流動模式的不一樣,如今讀取文件 1.txt,內容爲 0~9 十個數字。

// 暫停模式的用法
// 讀取的
const fs = require("fs");

// 建立可讀流
let rs = fs.createReadStream("1.txt", {
    encoding: "utf8",
    start: 0,
    highWaterMark: 3
});

rs.on("readable", () => {
    // read 參數爲本次讀取的個數
    let r = rs.read(3);
    // 打印讀取的數據
    console.log(r);
    // 打印容器剩餘空間
    console.log(rs._readableState.length);
});

// 012
// 0
// 345
// 0
// 678
// 0
// null
// 1
// 90
// 0

通俗的解釋,暫停模式的 readable 事件默認會觸發一次,監聽 readable 事件後就像建立了一個 「容器」,容量爲 highWaterMark,文件中的數據會自動把容器注滿,調用可讀流的 read 方法讀取時,會從容器中取出數據,若是 read 方法讀取的數據小於 highWaterMark,則直接暫停,再也不繼續讀取,若是大於 highWaterMark ,說明 「容器」 空了,則會觸發 readable 事件,不管讀取字節數與 highWaterMark 關係如何,只要 「容器」 內容量剩餘小於 highWaterMark 就會進行 「續杯」,再次向 「容器」 中填入 highWaterMark 個,因此有些時候真實的容量會大於 highWaterMark

read 方法讀取的內容會返回 null 是由於容器內真實的數據數小於了讀取數,若是不是最後一次讀取,會在屢次讀取後將值一併返回,若是是最後一次讀取,會把剩餘不足的數據返回。

一、readable 事件的觸發條件:「容器」 空了;
二、「續杯」 條件:讀取後 「容器」 內剩餘量小於 highWaterMark
三、read 返回 null:「容器」 容器內可悲讀取數據沒法知足一次讀取字節數。

二、ReadableStream 類的實現

同爲可讀流,暫停模式與流動模式相同,都依賴 fs 模塊和 events 模塊的 EventEmitter 類,參數依然爲讀取文件的路徑和 options

// 建立 ReadableStream 類
// 引入依賴
const EventEmitter = require("events");
const fs = require("fs");

class ReadableStream extends EventEmitter {
    constructor(path, options = {}) {
        super();
        this.path = path; // 讀取文件的路徑
        this.flags = options.flags || "r"; // 文件標識位
        this.encoding = options.encoding || null; // 字符編碼
        this.fd = options.fd || null; // 文件描述符
        this.mode = options.mode || 0o666; // 權限位
        this.autoClose = options.autoClose || true; // 是否自動關閉
        this.start = options.start || 0; // 讀取文件的起始位置
        this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次讀取文件的字節數

        this.reading = false; // 若是正在讀取,則再也不讀取
        this.emitReadable = false; // 當緩存區的長度等於 0 的時候, 觸發 readable
        this.arr = []; // 緩存區
        this.len = 0; // 緩存區的長度
        this.pos = this.start; // 下次讀取文件的位置(變化的)

        // 建立可讀流要打開文件
        this.open();

        this.on("newListener", type => {
            if (type === "readable") {
                this.read(); // 監聽readable就開始讀取
            }
        });
    }
}

// 導出模塊
module.exports = ReadableStream;

在類的添加了 newListener 事件,在回調中判斷是否監聽了 readable 事件,若是監聽了開始從 「容器」 中讀取。

三、打開、關閉文件 open 和 detroy

打開和關閉文件的方法和流動模式的套路基本類似。

// open 方法
// 打開文件
ReadableStream.prototype.open = function() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err) {
            this.emit("error", err);
            if (this.autoClose) {
                this.destroy();
                return;
            }
        }
        this.fd = fd;
        this.emit("open");
    });
};
// detroy 方法
// 關閉文件
ReadableStream.prototype.detroy = function() {
    if (typeof this.fd === "number") {
        fs.close(fd, () => {
            this.emit("close");
        });
        return;
    }
    this.emit("close");
};

四、從 「容器」 中讀取 read 方法的實現

read 方法的參數不傳時就至關於從 「容器」 讀取 highWaterMart 個字節,若是傳參表示讀取參數數量的字節數。

// read 方法
ReadableStream.prototype.read = function(n) {
    // 若是讀取大於了 highWaterMark,從新計算 highWaterMark,並從新讀取
    if (n > this.len) {
        // 計算新的 highWaterMark,方法摘自 NodeJS 源碼
        this.highWaterMark = computeNewHighWaterMark(n);
        this.reading = true;
        this._read();
    }

    // 將要返回的數據
    let buffer;

    // 若是讀取的字節數大於 0 小於等於當前緩存 Buffer 的總長度
    if (n > 0 && n <= this.len) {
        // 則從緩存中取出
        buffer = Buffer.alloc(n);

        let current; // 存儲每次從緩存區讀出的第一個 Buffer
        let index = 0; // 每次讀取緩存 Buffer 的索引
        let flag = true; // 是否結束整個 while 循環的標識

        // 開始讀取
        while ((current = this.arr.shift()) && flag) {
            for (let i = 0; i < current.length; i++) {
                // 將緩存中取到的 Buffer 的內容讀到本身定義的 Buffer 中
                buffer[index++] = current[i];

                // 若是當前索引值已經等於了讀取個數,結束 for 循環
                if (index === n) {
                    flag = false;

                    // 取出當前 Buffer 沒有消耗的
                    let residue = current.slice(i + 1);

                    // 在讀取後維護緩存的長度
                    this.len -= n;

                    // 若是 BUffer 真的有剩下的就給塞回到緩存中
                    if (residue.length) {
                        this.arr.unshift(residue);
                    }

                    break;
                }
            }
        }
    }

    // 若是當前 讀取的 Buffer 爲 0,將觸發 readable 事件
    if (this.len === 0) {
        this.emitReadable = true;
    }

    // 若是當前的緩存區大小小於 highWaterMark,就要讀取
    if (this.len < this.highWaterMark) {
        // 若是不是正在讀取才開始讀取
        if (!this.read) {
            this.reading = true;
            this._read(); // 正真讀取的方法
        }
    }

    // 將 buffer 轉回建立可讀流設置成的編碼格式
    if (buffer) {
        buffer = this.encoding ? buffer.toString(this.encoding) : buffer;
    }

    return buffer;
};

上面的 read 方法的參數大小對比緩存區中取出的 Buffer 長度有兩種狀況,一種是小於當前緩存區內取出 Buffer 的長度,一種是大於了真個緩存區的 len 的長度。

小於當前緩存區總長度經過循環取出須要的 Buffer 存儲了咱們要返回建立的 Buffer 中,剩餘的 Buffer 會丟失,因此咱們作了一個小小的處理,將剩下的 Buffer 做爲第一個 Buffer 塞回到了緩存區中,在處理這個問題時與流動模式不相同,流動模式處理後直接跳出了,而暫停模式至關於從 「容器」 中讀取,若是第一次讀取後還有剩餘還要接着從容器中繼續讀取。

大於 len 屬性時,規定須要從新計算 highWaterMark,遵循的原則是將當前 highWaterMark 設定爲當前讀取字節個數距離最接近的 2n 次方的數值,NodeJS 源碼中方法名稱爲 computeNewHighWaterMark,爲了提升性能是使用位運算的方式進行計算的,源碼以下。

// 從新計算 highWaterMark
function computeNewHighWaterMark(n) {
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
    return n;
}

在調用該方法從新計算 highWaterMark 後更改正在讀取狀態,從新讀取,因爲讀取邏輯的重複,因此真正讀取文件的邏輯抽取成一個 _read 方法來實現,下面呢就來看看 _read 內部都作了什麼。

五、真正讀取文件的 _read

對比可讀流(流動模式)的 read 方法,在調用 _read 方法讀取時,是在 newListener 中同步執行 _read,因此爲了保證 _read 的邏輯是在 open 方法打開文件之後執行,使用了與 read 相同的處理方式。

// _read 方法
ReadableStream.prototype._read = function() {
    if (typeof this.fd !== "number") {
        return this.once("open", () => _read());
    }

    // 建立本次讀取的 Buffer
    let buffer = Buffer.alloc(this.highWaterMark);

    // 讀取文件
    fs.read(
        this.fd,
        buffer,
        0,
        this.highWaterMark,
        this.pos,
        (err, bytesRead) => {
            if (bytesRead > 0) {
                this.arr.push(buffer); // 緩存
                this.len += bytesRead; // 維護緩存區長度
                this.pos += bytesRead; // 維護下一次讀取位置
                this.reading = false; // 讀取完畢

                // 觸發 readable 事件
                if (this.emitReadable) {
                    // 觸發後更改觸發狀態爲 false
                    this.emitReadable = false;
                    this.emit("readable");
                }
            } else {
                // 若是讀完觸發結束事件
                this.emit("end");
            }
        }
    );
};

因爲緩存區是一個數組,存儲的每個 Buffer 是獨立存在的,因此不能掛載在實例上共用,若是掛在實例上則引用相同,一動全動,這不是咱們想要的,因此每一次執行 _read 方法時都建立新的 Buffer 實例存入讀取的數據後存儲在緩存區中,若是讀取完成 bytesRead0,則觸發 end 事件。

注意:在 NodeJS 源碼中,可讀流的兩種模式代碼都是混合在一塊兒的,只是使用 fs.createReadStream 建立一個可讀流,經過監聽 datareadable 兩種不一樣的事件來觸發兩種不一樣的模式,而咱們爲了模擬,把兩種模式拆開成了兩個類來實現的,在測試時須要建立不一樣類的實例。


驗證可讀流(暫停模式)ReadableStream

爲了統一咱們依然讀取真正用法中 1.txt 文件,內容爲 0~9 十個數字。

// 驗證 ReadableStream
// 引入依賴
const fs = require("fs");
const ReadableStream = require("./ReadableStream");

let rs = new ReadableStream("1.txt", {
    encoding: "utf8",
    start: 0,
    highWaterMark: 3
});

rs.on("readable", () => {
    let r = rs.read(3);
    console.log(r);
    console.log(rs.len);
});

在打印 「容器」 剩餘容量時,咱們使用在 ReadableStream 上構造的 len 屬性。

流動模式和暫停模式分別有不一樣的應用場景,若是隻是但願讀取一個文件,並最快的得到結果使用流動模式是很好的選擇,若是但願瞭解讀取文件的具體內容,並進行精細的處理,使用暫停模式更好一些。


可寫流的實現

一、WriteStream 類建立

在使用 fscreateWriteStream 建立可寫流時,返回了 WriteStream 對象,上面也存在事件和方法,建立可寫流的時也是建立類的實例,咱們將這個類命名爲 WriteStream。事件一樣依賴 events 模塊的 EventEmitter 類,文件操做一樣依賴 fs 模塊,因此需提早引入。

// 建立 WriteStream 類
// 引入依賴模塊
const EventEmitter = require("events");
const fs = require("fs");

// 建立 WriteStream 類
class WriteStream extends EventEmitter {
    constructor(path, options = {}) {
        super();
        // 建立可寫流參數傳入的屬性
        this.path = path; // 寫入文件的路徑
        this.flags = options.flags || "w"; // 文件標識位
        this.encoding = options.encoding || "utf8"; // 字符編碼
        this.fd = options.fd || null; // 文件描述符
        this.mode = options.mode || 0o666; // 權限位
        this.autoClose = options.autoClose || true; // 是否自動關閉
        this.start = options.start || 0; // 寫入文件的起始位置
        this.highWaterMark = options.highWaterMark || 16 * 1024; // 對比寫入字節數的標識

        this.writing = false; // 是否正在寫入
        this.needDrain = false; // 是否須要觸發 drain 事件
        this.buffer = []; // 緩存,正在寫入就存入緩存中
        this.len = 0; // 當前緩存的個數
        this.pos = this.start; // 下次寫入文件的位置(變化的)

        // 建立可寫流要打開文件
        this.open();
    }
}

// 導出模塊
module.exports = WriteStream;

使用 fs.createWriteStream 建立可寫流時傳入了兩個參數,寫入的文件路徑和一個 options 選項,options 上有七個參數,咱們在建立 ReadStream 類的時候將這些參數初始化到了 this 上。

建立可寫流後須要使用 write 方法進行寫入,寫入時第一次會真的經過內存寫入到文件中,而再次寫入則會將內容寫到緩存中,注意這裏的 「內存」 和 「緩存」,內存是寫入文件是的系統內存,緩存是咱們本身建立的數組,第一次寫入之後要寫入文件的 Buffer 都會先存入這個數組中,這個數組名爲 buffer,掛載在實例上,實例上同時掛載了 len 屬性用來存儲當前緩存中 Buffer 總共的字節數(長度)。

咱們在可讀流上掛載了是否正在寫入的狀態 writing 屬性,只要緩存區中存在未寫入的 Buffer,writing 的狀態就是正在寫入,當寫入的字節數大於了 highWaterMark 須要觸發 drain 事件,因此又掛載了是否須要觸發 drain 事件的標識 needDrain 屬性。

當從文件的 start 值對應的位置開始寫入,下一次寫入文件的位置會發生變化,因此在 this 上掛載了 pos 屬性,用於存儲下次寫入文件的位置。

在 NodeJS 流的源碼中緩存是用鏈表實現的,經過指針來操做緩存中的 Buffer,而咱們爲了簡化邏輯就使用數組來做爲緩存,雖然性能相對鏈表要差。

二、打開、關閉文件 open 和 detroy

WriteStream 中,寫入文件以前也應該打開文件,在打開文件過程當中出錯時也應該觸發 error 事件並關閉文件,打開和關閉文件的方法 opendetroyReadStreamopendetroy 方法的邏輯一模一樣,因此這裏直接拿過來用了。

// open 方法
// 打開文件
WriteStream.prototype.open = function() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err) {
            this.emit("error", err);
            if (this.autoClose) {
                this.destroy();
                return;
            }
        }
        this.fd = fd;
        this.emit("open");
    });
};
// detroy 方法
// 關閉文件
WriteStream.prototype.detroy = function() {
    if (typeof this.fd === "number") {
        fs.close(fd, () => {
            this.emit("close");
        });
        return;
    }
    this.emit("close");
};

三、寫入文件方法 write 的實現

write 方法默認支持傳入三個參數:

  • chunk:寫入文件的內容;
  • encoding:寫入文件的編碼格式;
  • callback:寫入成功後執行的回調。
// write 方法
// 寫入文件的方法,只要邏輯爲寫入前的處理
WriteStream.prototype.write = function(
    chunk,
    encoding = this.encoding,
    callback
) {
    // 爲了方便操做將要寫入的數據轉換成 Buffer
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);

    // 維護緩存的長度
    this.len += chunk.lenth;

    // 維護是否觸發 drain 事件的標識
    this.needDrain = this.highWaterMark <= this.len;

    // 若是正在寫入
    if (this.writing) {
        this.buffer.push({
            chunk,
            encoding,
            callback
        });
    } else {
        // 更改標識爲正在寫入,再次寫入的時候走緩存
        this.writing = true;
        // 若是已經寫入清空緩存區的內容
        this._write(chunk, encoding, () => this.clearBuffer());
    }

    return !this.needDrain;
};

與可寫流的 read 同樣,咱們在使用 write 方法將數據寫入文件時,也是操做 Buffer,在 write 方法中,首先將接收到的要寫入的數據轉換成了 Buffer,由於是屢次寫入,要知道緩存中 Buffer 字節數的總長度,因此維護了 len 變量。

咱們的 WriteStream 構造函數中,this 掛載了 needDrain 屬性,在使用 fs.createWriteStream 建立的可讀流時,是寫入的字節長度超過 highWaterMark 纔會觸發 drain 事件,而 needDrainwrite 的返回值正好相反,因此咱們用 needDrain 取反來做爲 write 方法的返回值。

在寫入的邏輯中第一次是直接經過內存寫入到文件,可是再次寫入就須要將數據存入緩存,將數據寫入到文件中寫入狀態 writing 默認爲 false,經過緩存再寫入證實應該正在寫入中,因此在第一次寫入後應更改 writing 的狀態爲 true,寫入緩存其實就是把轉換的 Buffer、編碼以及寫入成功後要執行的回調掛在一個對象上存入緩存的數組 buffer 中。

咱們把真正寫入文件的邏輯抽取成一個單獨的方法 _write,並傳入 chunk(要寫入的內容,已經處理成 Buffer)、encoding(字符編碼)、回調函數,在回調函數中執行了原型方法 clearBuffer,接下來就來實現 _writeclearBuffer

_注意:方法使用 _ 開頭表明私有方法,輕易不要在外部調用或修改,這是一個開發者之間約定俗成的不成文規定。_

四、真正的文件操做 _write

對比可讀流(流動模式)的 read 方法,在調用 _write 方法寫入時,是在建立可寫流以後的同步代碼中執行的,與可讀流在 newListener 中同步執行 read 的狀況相似,因此爲了保證 _write 的邏輯是在 open 方法打開文件之後執行,使用了與 read 相同的處理方式。

// _write 方法
// 真正的寫入文件操做的方法
WriteStream.prototype._write = function(chunk, encoding, callback) {
    // 因爲 open 異步執行,write 是在建立實例時同步執行
    // write 執行可能早於 open,此時不存在文件描述符
    if (typeof this.fd !== "number") {
        // 由於 open 用 emit 觸發了 open 事件,因此在這是從新執行 write
        return this.once("open", () => this._write(chunk, encoding, callback));
    }

    // 讀取文件
    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
        // 維護下次寫入的位置和緩存區 Buffer 的總字節數
        this.pos += bytesWritten;
        this.len -= bytesWritten;
        callback();
    });
};

在打開文件並寫入的時候須要維護兩個變量,下次寫入的位置 pos 和當前緩存區內 Buffer 所佔總字節數 len,本次寫入了多少個字節,下次寫入須要在寫入位置的基礎上加多少個字節,而 len 偏偏相反,本次寫入了多少個字節,緩存區中的總長度應該對應的減小多少個字節。

在維護兩個變量的值之後調用 callback,其實 callback 內執行的是 clearBuffer 方法,就如方法名,譯爲 「清空緩存」,其實就是一次一次的將數據寫入文件並從緩存中移除,很明顯須要遞歸調用 _write 方法,咱們將這個遞歸的邏輯統一放在 clearBuffer 方法中實現。

五、清空緩存操做 clearBuffer

// clearBuffer 方法
// 清空緩存方法
WriteStream.prototype.clearBuffer = function() {
    // 先寫入的在數組前面,從前面取出緩存中的 Buffer
    let buf = this.buffer.shift();

    // 若是存在 buf,證實緩存還有 Buffer 須要寫入
    if (buf) {
        // 遞歸 _write 按照編碼將數據寫入文件
        this._write(buf.chunk, buf.encoding, () => this.clearBuffer);
    } else {
        // 若是沒有 buf,說明緩存內的內容已經徹底寫入文件並清空,須要觸發 drain 事件
        this.emit("drain");

        // 更改正在寫入狀態
        this.writing = false;

        // 更改是否須要觸發 drain 事件狀態
        this.needDrain = false;
    }
};

clearBuffer 方法中獲取了緩存區數組的最前面的 Buffer(最前面的是先寫入緩存的,也應該先取出來寫入文件),存在這個 Buffer 時,遞歸 _write 方法按照編碼將數據寫入文件,若是不存在說明緩存區已經清空了,表明內容徹底寫入文件中,因此觸發 drain 事件,最後更改了 writingneedDrain 的狀態。

更正 writing 是爲了 WriteStream 建立的可讀流在下次調用 write 方法時默認第一次真正寫入文件,而更正 needDrain 的狀態是在緩存區要清空的最後一個 Buffer 的長度小於了 highWaterMark 時,保證 write 方法的返回值是正確的。

第一次是真正寫入,其餘的都寫入緩存,再一個一個的將緩存中存儲的 Buffer 寫入並從緩存清空,之因此這樣設計是爲了把寫入的內容排成一個隊列,假若有 3 我的同時操做一個文件寫入內容,只有第一我的是真的寫入,其餘的人都寫在緩存中,再按照寫入緩存的順序依次寫入文件,避免衝突和寫入順序出錯。


驗證可寫流 WriteStream

接下來咱們使用本身實現的 WriteStream 類來建立可寫流,並按照 fs.createWriteStream 的用法進行使用並驗證。

// 驗證 WriteStream
// 向 1.txt 文件中寫入 012345
const fs = require("fs");
const WriteStream = require("./WriteStream");

// 建立可寫流
let ws = new WriteStream("2.txt", {
    highWaterMark: 3
});

let i = 0;

function write() {
    let flag = true;
    while (i <= 6 && flag) {
        i++;
        flag = ws.write(i + "", "utf8");
    }
}

ws.on("drain", function() {
    console.log("寫入成功");
    write();
});
write();

// true
// true
// false
// 寫入成功
// true
// true
// false
// 寫入成功

可使用 fs.createWriteStreamWriteStream 類分別執行上面的代碼,對比結果,看看是否相同。


可讀流和可寫流的橋樑 pipe

可寫流和可讀流通常是經過 pipe 配合來使用的,pipe 方法是可讀流 ReadStream 的原型方法,參數爲一個可寫流。

// pipe 方法
// 鏈接可讀流和可寫流的方法 pipe
ReadStream.prototype.pipe = function(dest) {
    // 開始讀取
    this.on("data", data => {
        // 若是超出可寫流的 highWaterMark,暫停讀取
        let flag = dest.write(data);
        if (!flag) this.pause();
    });

    dest.on("drain", () => {
        // 當可寫流清空內存時恢復讀取
        this.resume();
    });

    this.on("end", () => {
        // 在讀取完畢後關閉文件
        this.destroy();
    });
};

pipe 方法其實就是經過可讀流的 data 事件觸發流動狀態,並用可寫流接收讀出的數據進行寫入,當寫入數據超出 highWaterMark,則暫停可讀流的讀取,直到可寫流的緩存被清空並把內容寫進文件後,恢復可讀流的讀取,當讀取結束後關閉文件。

下面咱們實現一個將 1.txt 的內容拷貝 2.txt 中的例子。

// 驗證 pipe
// pipe 的使用
const fs = require("fs");

// 引入本身的 ReadStream 類和 WriteStream 類
const ReadStream = rquire("./ReadStream");
const WriteStream = rquire("./WriteStream");

// 建立可讀流和可寫流
let rs = new ReadStream("1.txt", {
    highWaterMark: 3
});
let ws = new WriteStream("2.txt", {
    highWaterMark: 2
});

// 使用 pipe 實現文件內容複製
rs.pipe(ws);


總結

在 NodeJS 源碼中,可讀流和可寫流的內容要比本篇內容多不少,本篇是將源碼精簡,抽出核心邏輯並針對流的使用方式進行實現,主要目的是幫助理解流的原理和使用,爭取作到 「知其然知其因此然」,瞭解了一些底層再對流使用時,也能遊刃有餘。

相關文章
相關標籤/搜索