在以前的博客中已經瞭解了流的基本用法,這篇的重點在於根據可讀流的用法對可讀流的原理進行分析,實現簡易版的 ReadStream
,流的基本用法請看這裏 NodeJS —— Stream 的基本使用。編程
在使用 fs
的 createReadStream
建立可讀流時,返回了 ReadStream
對象,上面存在着一些事件和方法,其實咱們在建立這個可讀流的時候建立了某一個類的實例,這個實例能夠調用類原型上的方法,咱們這裏將這個類命名爲 ReadStream
。數組
在類原型上的方法內部可能會建立一些事件,在 NodeJS 中,事件是依賴 events
模塊的,即 EventEmitter
類,同時類的方法可能會操做文件,會用到 fs
模塊,因此也提早引入 fs
。緩存
// 建立 ReadStream 類 // 引入依賴模塊 const EventEmitter = require("events"); const fs = require("fs"); // 建立 ReadStream 類 class ReadStream extends EventEmitter { constructor(path, options = {}) { super(); // 建立可讀流參數傳入的屬性 this.path = path; // 讀取文件的路徑 this.flags = options.flags || "r"; // 文件標識位 this.encoding = options.encoding || null; // 字符編碼 this.fd = options.fd || null; // 文件描述符 this.mode = options.mode || 0o666; // 權限位 this.autoClose = options.autoClose || true; // 是否自動關閉 this.start = options.start || 0; // 讀取文件的起始位置 this.end = options.end || null; // 讀取文件的結束位置(包含) this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次讀取文件的字節數 this.flowing = false; // 控制當前是不是流動狀態,默認爲暫停狀態 this.buffer = Buffer.alloc(this.highWaterMark); // 存儲讀取內容的 Buffer this.pos = this.start; // 下次讀取文件的位置(變化的) // 建立可讀流要打開文件 this.open(); // 若是監聽了 data 事件,切換爲流動狀態 this.on("newListener", type => { if (type === "data") { this.flowing = true; // 開始讀取文件 this.read(); } }); } } // 導出模塊 module.exports = ReadStream;
使用 fs.createReadStream
時傳入了兩個參數,讀取文件的路徑和一個 options
選項,options
上有八個參數,咱們在建立 ReadStream
類的時候將這些參數初始化到了 this
上。異步
建立可讀流的時候有兩種狀態,流動狀態和暫停狀態,默認建立可讀流是暫停狀態,只有在觸發 data
事件時纔會變爲流動狀態,因此在 this
上掛載了 flowing
存儲當前的狀態是否爲流動狀態,值默認爲 false
。函數
注意:這裏說的暫停狀態不是暫停模式,暫停模式是 readable
, 是可讀流的另外一種模式,咱們這節討論的可讀流爲流動模式。性能
在讀取文件時實際上是操做 Buffer 進行讀取的,須要有一個 Buffer 實例用來存儲每次讀取的數據,因此在 this
上掛載了一個新建立的 Buffer,長度等於 highWaterMark
。測試
當從 start
值的位置開始讀取文件,下一次讀取文件的位置會發生變化,因此在 this
上掛載了 pos
屬性,用於存儲下次讀取文件的位置。ui
在建立 ReadStream
的實例(可讀流)時,應該打開文件並進行其餘操做,因此在 this
上掛載了 open
方法並執行。this
建立實例的同時監聽了 newListener
事件,回調在每次使用 on
監聽事件時觸發,回調內部邏輯是爲了將默認的暫停狀態切換爲流動狀態,由於在使用時,流動狀態是經過監聽 data
事件觸發的,在 newListener
的回調中判斷事件類型爲 data
的時候將 flowing
標識的值更改成 true
,並調用讀取文件的 read
方法。編碼
在使用 ES6 的類編程時,原型上的方法都是寫在 class
內部,咱們下面爲了把原型上的方法拆分出來成爲單獨的代碼塊,都使用 ReadStream.prototype.open = function...
直接給原型添加屬性的方式,但這樣的方式和直接寫在 class
內有一點區別,就是 class
內部的書寫的原型方法都是不可遍歷的,添加屬性的方式建立的方法都是可遍歷的,可是這點區別對咱們代碼的執行沒有任何影響。
在使用可讀流時,打開時默認是暫停狀態,會觸發 open
事件,若是打開文件出錯會觸發 error
事件。
// open 方法 // 打開文件 ReadStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); // 若是文件打開了出錯,並配置自動關閉,則關掉文件 if (this.autoClose) { // 關閉文件(觸發 close 事件) this.destroy(); // 再也不繼續執行 return; } } // 存儲文件描述符 this.fd = fd; // 成功打開文件後觸發 open 事件 this.emit("open"); }); };
open
方法的邏輯就是在打開文件的時候,將文件描述符存儲在實例上方便後面使用,並使用 EventEmitter
的原型方法 emit
觸發 open
事件,若是出錯就使用 emit
觸發 error
事件,若是配置 autoClose
參數爲 true
,就關閉文件並觸發 close
。
咱們將關閉文件的邏輯抽取出來封裝在了 ReadStream
類的 destroy
方法中,下面來實現 destroy
。
文件出錯分爲兩種,第一種文件打開出錯,第二種是文件不存在出錯(沒打開),第二種系統是沒有分配文件描述符的。
// detroy 方法 // 關閉文件 ReadStream.prototype.detroy = function() { // 判斷是否存在文件描述符 if (typeof this.fd === "number") { // 存在則關閉文件並觸發 close 事件 fs.close(fd, () => { this.emit("close"); }); return; } // 不存在文件描述符直接觸發 close 事件 this.emit("close"); };
若是是打開文件後出錯須要關閉文件,並觸發 close
事件,若是是沒打開文件,則直接觸發 close
事件,因此上面經過文件描述符來判斷該如何處理。
還記得在 ReadStream
類中,監聽的 newListener
事件的回調中若是監聽了 data
事件則會執行 read
讀取文件,接下來就實現讀取文件的核心邏輯。
// read 方法 // 讀取文件 ReadStream.prototype.read = function() { // 因爲 open 異步執行,read 是在建立實例時同步執行 // read 執行可能早於 open,此時不存在文件描述符 if (typeof this.fd !== "number") { // 由於 open 用 emit 觸發了 open 事件,因此在這是從新執行 read return this.once("open", () => this.read()); } // 如過設置告終束位置,讀到結束爲止就不能再讀了 // 若是最後一次讀取真實讀取數應該小於 highWaterMark // 因此每次讀取的字節數應該和 highWaterMark 取最小值 let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark; // 讀取文件 fs.read( this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => { // 若是讀到內容執行下面代碼,讀不到則觸發 end 事件並關閉文件 if (bytesRead > 0) { // 維護下次讀取文件位置 this.pos += bytesRead; // 保留有效的 Buffer let realBuf = this.buffer.slice(0, bytesRead); // 根據編碼處理 data 回調返回的數據 realBuf = this.encoding ? realBuf.toString(this.encoding) : realBuf; // 觸發 data 事件並傳遞數據 this.emit("data", realBuf); // 遞歸讀取 if (this.flowing) { this.read(); } } else { this.isEnd = true; this.emit("end"); // 觸發 end 事件 this.detroy(); // 關閉文件 } } ); };
建立 ReadStream
的實例時,執行的 open
方法內部是使用 fs.open
打開文件的,是異步操做,而讀取文件方法 read
是在 newListener
回調中同步執行的,這樣極可能觸發 read
的時候文件尚未被打開(不存在文件描述符),因此在 read
方法中判斷了文件描述符是否存在,並在不存在時候使用 once
添加了 open
事件,回調中從新執行了 read
。
因爲在 open
方法中使用 emit
觸發了 open
事件,因此 read
內用 once
添加的 open
事件的回調也會跟着執行一次,並在回調中從新調用了 read
方法,保證了 read
讀取文件的邏輯在文件真正打開後才執行,爲了文件打開前執行 read
而不執行讀取文件的邏輯,用 once
添加 open
事件時別忘記 return
。
在使用 fs.read
讀取文件的時候有一個參數爲本次讀取幾個字符到 Buffer 中,若是在建立可讀流的時候設置了讀取文件的結束位置 end
參數,則讀到 end
位置就不該該再繼續讀取了,因此在存在 end
參數的時候每次都計算一下讀取個數和 highWaterMark
取最小值,保證讀取內容小於 highWaterMark
的時候不會多讀,由於讀取時是包括 end
值做爲 Buffer 的索引這一項的,因此計算時多減去的要 +1
加回來,再一次讀取這個讀取個數計算結果變成了 0
,也就結束了讀取。
由於 end
參數的狀況,因此在內部讀取邏輯前判斷了 bytesRead
(實際讀取字節數)是否大於 0
,若是不知足條件則在實例添加是否讀取結束標識 isEnd
(後面使用),觸發 end
事件並關閉文件,若是知足條件,也是經過 bytesRead
對 Buffer 進行截取,保留了有用的 Buffer,而且經過 encoding
編碼對 Buffer 進行處理後,觸發 data
事件,並將處理後的數據傳遞給 data
事件的回調。
pause
的目的就是暫停讀取,其實就是阻止 read
方法在讀取時進行遞歸,因此只須要更改 flowing
的值便可。
// pause 方法 // 暫停讀取 ReadStream.prototype.pause = function() { this.flowing = false; };
resume
的目的是恢復讀取,在更改 flowing
值得基礎上從新執行 read
方法,因爲在 pause
調用時 read
內部仍是執行得讀取文件得分支,文件並無關閉,讀取文件位置的參數也是經過實例上的當前的屬性值進行計算的,因此從新執行 read
會繼續上一次的位置讀取。
// resume 方法 // 恢復讀取 ReadStream.prototype.resume = function() { this.flowing = true; if (!this.isEnd) this.read(); };
上面在從新執行 read
以前使用 isEnd
標識作了判斷,防止在 setInterval
中調用 resume
在讀取完成後不斷的觸發 end
和 close
事件。
接下來咱們使用本身實現的 ReadStream
類來建立可讀流,並按照 fs.createReadStream
的用法進行使用並驗證。
// 驗證 ReadStream // 文件 1.txt 內容爲 0123456789 const fs = require("fs"); const ReadStream = require("./ReadStream"); // 建立可讀流 let rs = new ReadStream("1.txt", { encoding: "utf8", start: 0, end: 5, highWaterMark: 2 }); rs.on("open", () => console.log("open")); rs.on("data", data => { console.log(data, new Date()); rs.pause(); }); rs.on("end", () => console.log("end")); rs.on("close", () => console.log("close")); rs.on("error", err => console.log(err)); setInterval(() => rs.resume(), 1000); // open // 01 2018-07-04T10:44:20.384Z // 23 2018-07-04T10:44:21.384Z // 45 2018-07-04T10:44:22.384Z // end // close
執行上面的代碼正常的執行邏輯是先觸發 open
事件,而後觸發 data
事件,讀取一次後暫停,每隔一秒恢復讀取一次,再讀取完成後觸發 end
和 close
事件,經過運行代碼結果和咱們但願的同樣。
在 fs
模塊中用 createReadStream
建立的可讀流中經過監聽 readable
事件觸發暫停模式(監聽 data 事件觸發流動模式),經過下面例子感覺暫停模式與流動模式的不一樣,如今讀取文件 1.txt
,內容爲 0~9
十個數字。
// 暫停模式的用法 // 讀取的 const fs = require("fs"); // 建立可讀流 let rs = fs.createReadStream("1.txt", { encoding: "utf8", start: 0, highWaterMark: 3 }); rs.on("readable", () => { // read 參數爲本次讀取的個數 let r = rs.read(3); // 打印讀取的數據 console.log(r); // 打印容器剩餘空間 console.log(rs._readableState.length); }); // 012 // 0 // 345 // 0 // 678 // 0 // null // 1 // 90 // 0
通俗的解釋,暫停模式的 readable
事件默認會觸發一次,監聽 readable
事件後就像建立了一個 「容器」,容量爲 highWaterMark
,文件中的數據會自動把容器注滿,調用可讀流的 read
方法讀取時,會從容器中取出數據,若是 read
方法讀取的數據小於 highWaterMark
,則直接暫停,再也不繼續讀取,若是大於 highWaterMark
,說明 「容器」 空了,則會觸發 readable
事件,不管讀取字節數與 highWaterMark
關係如何,只要 「容器」 內容量剩餘小於 highWaterMark
就會進行 「續杯」,再次向 「容器」 中填入 highWaterMark
個,因此有些時候真實的容量會大於 highWaterMark
。
read
方法讀取的內容會返回 null
是由於容器內真實的數據數小於了讀取數,若是不是最後一次讀取,會在屢次讀取後將值一併返回,若是是最後一次讀取,會把剩餘不足的數據返回。
一、readable
事件的觸發條件:「容器」 空了;
二、「續杯」 條件:讀取後 「容器」 內剩餘量小於 highWaterMark
。
三、read
返回 null
:「容器」 容器內可悲讀取數據沒法知足一次讀取字節數。
同爲可讀流,暫停模式與流動模式相同,都依賴 fs
模塊和 events
模塊的 EventEmitter
類,參數依然爲讀取文件的路徑和 options
。
// 建立 ReadableStream 類 // 引入依賴 const EventEmitter = require("events"); const fs = require("fs"); class ReadableStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; // 讀取文件的路徑 this.flags = options.flags || "r"; // 文件標識位 this.encoding = options.encoding || null; // 字符編碼 this.fd = options.fd || null; // 文件描述符 this.mode = options.mode || 0o666; // 權限位 this.autoClose = options.autoClose || true; // 是否自動關閉 this.start = options.start || 0; // 讀取文件的起始位置 this.highWaterMark = options.highWaterMark || 64 * 1024; // 每次讀取文件的字節數 this.reading = false; // 若是正在讀取,則再也不讀取 this.emitReadable = false; // 當緩存區的長度等於 0 的時候, 觸發 readable this.arr = []; // 緩存區 this.len = 0; // 緩存區的長度 this.pos = this.start; // 下次讀取文件的位置(變化的) // 建立可讀流要打開文件 this.open(); this.on("newListener", type => { if (type === "readable") { this.read(); // 監聽readable就開始讀取 } }); } } // 導出模塊 module.exports = ReadableStream;
在類的添加了 newListener
事件,在回調中判斷是否監聽了 readable
事件,若是監聽了開始從 「容器」 中讀取。
打開和關閉文件的方法和流動模式的套路基本類似。
// open 方法 // 打開文件 ReadableStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); if (this.autoClose) { this.destroy(); return; } } this.fd = fd; this.emit("open"); }); };
// detroy 方法 // 關閉文件 ReadableStream.prototype.detroy = function() { if (typeof this.fd === "number") { fs.close(fd, () => { this.emit("close"); }); return; } this.emit("close"); };
read
方法的參數不傳時就至關於從 「容器」 讀取 highWaterMart
個字節,若是傳參表示讀取參數數量的字節數。
// read 方法 ReadableStream.prototype.read = function(n) { // 若是讀取大於了 highWaterMark,從新計算 highWaterMark,並從新讀取 if (n > this.len) { // 計算新的 highWaterMark,方法摘自 NodeJS 源碼 this.highWaterMark = computeNewHighWaterMark(n); this.reading = true; this._read(); } // 將要返回的數據 let buffer; // 若是讀取的字節數大於 0 小於等於當前緩存 Buffer 的總長度 if (n > 0 && n <= this.len) { // 則從緩存中取出 buffer = Buffer.alloc(n); let current; // 存儲每次從緩存區讀出的第一個 Buffer let index = 0; // 每次讀取緩存 Buffer 的索引 let flag = true; // 是否結束整個 while 循環的標識 // 開始讀取 while ((current = this.arr.shift()) && flag) { for (let i = 0; i < current.length; i++) { // 將緩存中取到的 Buffer 的內容讀到本身定義的 Buffer 中 buffer[index++] = current[i]; // 若是當前索引值已經等於了讀取個數,結束 for 循環 if (index === n) { flag = false; // 取出當前 Buffer 沒有消耗的 let residue = current.slice(i + 1); // 在讀取後維護緩存的長度 this.len -= n; // 若是 BUffer 真的有剩下的就給塞回到緩存中 if (residue.length) { this.arr.unshift(residue); } break; } } } } // 若是當前 讀取的 Buffer 爲 0,將觸發 readable 事件 if (this.len === 0) { this.emitReadable = true; } // 若是當前的緩存區大小小於 highWaterMark,就要讀取 if (this.len < this.highWaterMark) { // 若是不是正在讀取才開始讀取 if (!this.read) { this.reading = true; this._read(); // 正真讀取的方法 } } // 將 buffer 轉回建立可讀流設置成的編碼格式 if (buffer) { buffer = this.encoding ? buffer.toString(this.encoding) : buffer; } return buffer; };
上面的 read
方法的參數大小對比緩存區中取出的 Buffer 長度有兩種狀況,一種是小於當前緩存區內取出 Buffer 的長度,一種是大於了真個緩存區的 len
的長度。
小於當前緩存區總長度經過循環取出須要的 Buffer 存儲了咱們要返回建立的 Buffer 中,剩餘的 Buffer 會丟失,因此咱們作了一個小小的處理,將剩下的 Buffer 做爲第一個 Buffer 塞回到了緩存區中,在處理這個問題時與流動模式不相同,流動模式處理後直接跳出了,而暫停模式至關於從 「容器」 中讀取,若是第一次讀取後還有剩餘還要接着從容器中繼續讀取。
大於 len
屬性時,規定須要從新計算 highWaterMark
,遵循的原則是將當前 highWaterMark
設定爲當前讀取字節個數距離最接近的 2
的 n
次方的數值,NodeJS 源碼中方法名稱爲 computeNewHighWaterMark
,爲了提升性能是使用位運算的方式進行計算的,源碼以下。
// 從新計算 highWaterMark function computeNewHighWaterMark(n) { n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; return n; }
在調用該方法從新計算 highWaterMark
後更改正在讀取狀態,從新讀取,因爲讀取邏輯的重複,因此真正讀取文件的邏輯抽取成一個 _read
方法來實現,下面呢就來看看 _read
內部都作了什麼。
對比可讀流(流動模式)的 read
方法,在調用 _read
方法讀取時,是在 newListener
中同步執行 _read
,因此爲了保證 _read
的邏輯是在 open
方法打開文件之後執行,使用了與 read
相同的處理方式。
// _read 方法 ReadableStream.prototype._read = function() { if (typeof this.fd !== "number") { return this.once("open", () => _read()); } // 建立本次讀取的 Buffer let buffer = Buffer.alloc(this.highWaterMark); // 讀取文件 fs.read( this.fd, buffer, 0, this.highWaterMark, this.pos, (err, bytesRead) => { if (bytesRead > 0) { this.arr.push(buffer); // 緩存 this.len += bytesRead; // 維護緩存區長度 this.pos += bytesRead; // 維護下一次讀取位置 this.reading = false; // 讀取完畢 // 觸發 readable 事件 if (this.emitReadable) { // 觸發後更改觸發狀態爲 false this.emitReadable = false; this.emit("readable"); } } else { // 若是讀完觸發結束事件 this.emit("end"); } } ); };
因爲緩存區是一個數組,存儲的每個 Buffer 是獨立存在的,因此不能掛載在實例上共用,若是掛在實例上則引用相同,一動全動,這不是咱們想要的,因此每一次執行 _read
方法時都建立新的 Buffer 實例存入讀取的數據後存儲在緩存區中,若是讀取完成 bytesRead
爲 0
,則觸發 end
事件。
注意:在 NodeJS 源碼中,可讀流的兩種模式代碼都是混合在一塊兒的,只是使用 fs.createReadStream
建立一個可讀流,經過監聽 data
和 readable
兩種不一樣的事件來觸發兩種不一樣的模式,而咱們爲了模擬,把兩種模式拆開成了兩個類來實現的,在測試時須要建立不一樣類的實例。
爲了統一咱們依然讀取真正用法中 1.txt
文件,內容爲 0~9 十個數字。
// 驗證 ReadableStream // 引入依賴 const fs = require("fs"); const ReadableStream = require("./ReadableStream"); let rs = new ReadableStream("1.txt", { encoding: "utf8", start: 0, highWaterMark: 3 }); rs.on("readable", () => { let r = rs.read(3); console.log(r); console.log(rs.len); });
在打印 「容器」 剩餘容量時,咱們使用在 ReadableStream
上構造的 len
屬性。
流動模式和暫停模式分別有不一樣的應用場景,若是隻是但願讀取一個文件,並最快的得到結果使用流動模式是很好的選擇,若是但願瞭解讀取文件的具體內容,並進行精細的處理,使用暫停模式更好一些。
在使用 fs
的 createWriteStream
建立可寫流時,返回了 WriteStream
對象,上面也存在事件和方法,建立可寫流的時也是建立類的實例,咱們將這個類命名爲 WriteStream
。事件一樣依賴 events
模塊的 EventEmitter
類,文件操做一樣依賴 fs
模塊,因此需提早引入。
// 建立 WriteStream 類 // 引入依賴模塊 const EventEmitter = require("events"); const fs = require("fs"); // 建立 WriteStream 類 class WriteStream extends EventEmitter { constructor(path, options = {}) { super(); // 建立可寫流參數傳入的屬性 this.path = path; // 寫入文件的路徑 this.flags = options.flags || "w"; // 文件標識位 this.encoding = options.encoding || "utf8"; // 字符編碼 this.fd = options.fd || null; // 文件描述符 this.mode = options.mode || 0o666; // 權限位 this.autoClose = options.autoClose || true; // 是否自動關閉 this.start = options.start || 0; // 寫入文件的起始位置 this.highWaterMark = options.highWaterMark || 16 * 1024; // 對比寫入字節數的標識 this.writing = false; // 是否正在寫入 this.needDrain = false; // 是否須要觸發 drain 事件 this.buffer = []; // 緩存,正在寫入就存入緩存中 this.len = 0; // 當前緩存的個數 this.pos = this.start; // 下次寫入文件的位置(變化的) // 建立可寫流要打開文件 this.open(); } } // 導出模塊 module.exports = WriteStream;
使用 fs.createWriteStream
建立可寫流時傳入了兩個參數,寫入的文件路徑和一個 options
選項,options
上有七個參數,咱們在建立 ReadStream
類的時候將這些參數初始化到了 this
上。
建立可寫流後須要使用 write
方法進行寫入,寫入時第一次會真的經過內存寫入到文件中,而再次寫入則會將內容寫到緩存中,注意這裏的 「內存」 和 「緩存」,內存是寫入文件是的系統內存,緩存是咱們本身建立的數組,第一次寫入之後要寫入文件的 Buffer 都會先存入這個數組中,這個數組名爲 buffer
,掛載在實例上,實例上同時掛載了 len
屬性用來存儲當前緩存中 Buffer 總共的字節數(長度)。
咱們在可讀流上掛載了是否正在寫入的狀態 writing
屬性,只要緩存區中存在未寫入的 Buffer,writing
的狀態就是正在寫入,當寫入的字節數大於了 highWaterMark
須要觸發 drain
事件,因此又掛載了是否須要觸發 drain
事件的標識 needDrain
屬性。
當從文件的 start
值對應的位置開始寫入,下一次寫入文件的位置會發生變化,因此在 this
上掛載了 pos
屬性,用於存儲下次寫入文件的位置。
在 NodeJS 流的源碼中緩存是用鏈表實現的,經過指針來操做緩存中的 Buffer,而咱們爲了簡化邏輯就使用數組來做爲緩存,雖然性能相對鏈表要差。
在 WriteStream
中,寫入文件以前也應該打開文件,在打開文件過程當中出錯時也應該觸發 error
事件並關閉文件,打開和關閉文件的方法 open
和 detroy
與 ReadStream
的 open
和 detroy
方法的邏輯一模一樣,因此這裏直接拿過來用了。
// open 方法 // 打開文件 WriteStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit("error", err); if (this.autoClose) { this.destroy(); return; } } this.fd = fd; this.emit("open"); }); };
// detroy 方法 // 關閉文件 WriteStream.prototype.detroy = function() { if (typeof this.fd === "number") { fs.close(fd, () => { this.emit("close"); }); return; } this.emit("close"); };
write
方法默認支持傳入三個參數:
// write 方法 // 寫入文件的方法,只要邏輯爲寫入前的處理 WriteStream.prototype.write = function( chunk, encoding = this.encoding, callback ) { // 爲了方便操做將要寫入的數據轉換成 Buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); // 維護緩存的長度 this.len += chunk.lenth; // 維護是否觸發 drain 事件的標識 this.needDrain = this.highWaterMark <= this.len; // 若是正在寫入 if (this.writing) { this.buffer.push({ chunk, encoding, callback }); } else { // 更改標識爲正在寫入,再次寫入的時候走緩存 this.writing = true; // 若是已經寫入清空緩存區的內容 this._write(chunk, encoding, () => this.clearBuffer()); } return !this.needDrain; };
與可寫流的 read
同樣,咱們在使用 write
方法將數據寫入文件時,也是操做 Buffer,在 write
方法中,首先將接收到的要寫入的數據轉換成了 Buffer,由於是屢次寫入,要知道緩存中 Buffer 字節數的總長度,因此維護了 len
變量。
咱們的 WriteStream
構造函數中,this
掛載了 needDrain
屬性,在使用 fs.createWriteStream
建立的可讀流時,是寫入的字節長度超過 highWaterMark
纔會觸發 drain
事件,而 needDrain
與 write
的返回值正好相反,因此咱們用 needDrain
取反來做爲 write
方法的返回值。
在寫入的邏輯中第一次是直接經過內存寫入到文件,可是再次寫入就須要將數據存入緩存,將數據寫入到文件中寫入狀態 writing
默認爲 false
,經過緩存再寫入證實應該正在寫入中,因此在第一次寫入後應更改 writing
的狀態爲 true
,寫入緩存其實就是把轉換的 Buffer、編碼以及寫入成功後要執行的回調掛在一個對象上存入緩存的數組 buffer
中。
咱們把真正寫入文件的邏輯抽取成一個單獨的方法 _write
,並傳入 chunk
(要寫入的內容,已經處理成 Buffer)、encoding
(字符編碼)、回調函數,在回調函數中執行了原型方法 clearBuffer
,接下來就來實現 _write
和 clearBuffer
。
_注意:方法使用 _
開頭表明私有方法,輕易不要在外部調用或修改,這是一個開發者之間約定俗成的不成文規定。_
對比可讀流(流動模式)的 read
方法,在調用 _write
方法寫入時,是在建立可寫流以後的同步代碼中執行的,與可讀流在 newListener
中同步執行 read
的狀況相似,因此爲了保證 _write
的邏輯是在 open
方法打開文件之後執行,使用了與 read
相同的處理方式。
// _write 方法 // 真正的寫入文件操做的方法 WriteStream.prototype._write = function(chunk, encoding, callback) { // 因爲 open 異步執行,write 是在建立實例時同步執行 // write 執行可能早於 open,此時不存在文件描述符 if (typeof this.fd !== "number") { // 由於 open 用 emit 觸發了 open 事件,因此在這是從新執行 write return this.once("open", () => this._write(chunk, encoding, callback)); } // 讀取文件 fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => { // 維護下次寫入的位置和緩存區 Buffer 的總字節數 this.pos += bytesWritten; this.len -= bytesWritten; callback(); }); };
在打開文件並寫入的時候須要維護兩個變量,下次寫入的位置 pos
和當前緩存區內 Buffer 所佔總字節數 len
,本次寫入了多少個字節,下次寫入須要在寫入位置的基礎上加多少個字節,而 len
偏偏相反,本次寫入了多少個字節,緩存區中的總長度應該對應的減小多少個字節。
在維護兩個變量的值之後調用 callback
,其實 callback
內執行的是 clearBuffer
方法,就如方法名,譯爲 「清空緩存」,其實就是一次一次的將數據寫入文件並從緩存中移除,很明顯須要遞歸調用 _write
方法,咱們將這個遞歸的邏輯統一放在 clearBuffer
方法中實現。
// clearBuffer 方法 // 清空緩存方法 WriteStream.prototype.clearBuffer = function() { // 先寫入的在數組前面,從前面取出緩存中的 Buffer let buf = this.buffer.shift(); // 若是存在 buf,證實緩存還有 Buffer 須要寫入 if (buf) { // 遞歸 _write 按照編碼將數據寫入文件 this._write(buf.chunk, buf.encoding, () => this.clearBuffer); } else { // 若是沒有 buf,說明緩存內的內容已經徹底寫入文件並清空,須要觸發 drain 事件 this.emit("drain"); // 更改正在寫入狀態 this.writing = false; // 更改是否須要觸發 drain 事件狀態 this.needDrain = false; } };
clearBuffer
方法中獲取了緩存區數組的最前面的 Buffer(最前面的是先寫入緩存的,也應該先取出來寫入文件),存在這個 Buffer 時,遞歸 _write
方法按照編碼將數據寫入文件,若是不存在說明緩存區已經清空了,表明內容徹底寫入文件中,因此觸發 drain
事件,最後更改了 writing
和 needDrain
的狀態。
更正 writing
是爲了 WriteStream
建立的可讀流在下次調用 write
方法時默認第一次真正寫入文件,而更正 needDrain
的狀態是在緩存區要清空的最後一個 Buffer 的長度小於了 highWaterMark
時,保證 write
方法的返回值是正確的。
第一次是真正寫入,其餘的都寫入緩存,再一個一個的將緩存中存儲的 Buffer 寫入並從緩存清空,之因此這樣設計是爲了把寫入的內容排成一個隊列,假若有 3
我的同時操做一個文件寫入內容,只有第一我的是真的寫入,其餘的人都寫在緩存中,再按照寫入緩存的順序依次寫入文件,避免衝突和寫入順序出錯。
接下來咱們使用本身實現的 WriteStream 類來建立可寫流,並按照 fs.createWriteStream
的用法進行使用並驗證。
// 驗證 WriteStream // 向 1.txt 文件中寫入 012345 const fs = require("fs"); const WriteStream = require("./WriteStream"); // 建立可寫流 let ws = new WriteStream("2.txt", { highWaterMark: 3 }); let i = 0; function write() { let flag = true; while (i <= 6 && flag) { i++; flag = ws.write(i + "", "utf8"); } } ws.on("drain", function() { console.log("寫入成功"); write(); }); write(); // true // true // false // 寫入成功 // true // true // false // 寫入成功
可使用 fs.createWriteStream
和 WriteStream
類分別執行上面的代碼,對比結果,看看是否相同。
可寫流和可讀流通常是經過 pipe
配合來使用的,pipe
方法是可讀流 ReadStream
的原型方法,參數爲一個可寫流。
// pipe 方法 // 鏈接可讀流和可寫流的方法 pipe ReadStream.prototype.pipe = function(dest) { // 開始讀取 this.on("data", data => { // 若是超出可寫流的 highWaterMark,暫停讀取 let flag = dest.write(data); if (!flag) this.pause(); }); dest.on("drain", () => { // 當可寫流清空內存時恢復讀取 this.resume(); }); this.on("end", () => { // 在讀取完畢後關閉文件 this.destroy(); }); };
pipe
方法其實就是經過可讀流的 data
事件觸發流動狀態,並用可寫流接收讀出的數據進行寫入,當寫入數據超出 highWaterMark
,則暫停可讀流的讀取,直到可寫流的緩存被清空並把內容寫進文件後,恢復可讀流的讀取,當讀取結束後關閉文件。
下面咱們實現一個將 1.txt
的內容拷貝 2.txt
中的例子。
// 驗證 pipe // pipe 的使用 const fs = require("fs"); // 引入本身的 ReadStream 類和 WriteStream 類 const ReadStream = rquire("./ReadStream"); const WriteStream = rquire("./WriteStream"); // 建立可讀流和可寫流 let rs = new ReadStream("1.txt", { highWaterMark: 3 }); let ws = new WriteStream("2.txt", { highWaterMark: 2 }); // 使用 pipe 實現文件內容複製 rs.pipe(ws);
在 NodeJS 源碼中,可讀流和可寫流的內容要比本篇內容多不少,本篇是將源碼精簡,抽出核心邏輯並針對流的使用方式進行實現,主要目的是幫助理解流的原理和使用,爭取作到 「知其然知其因此然」,瞭解了一些底層再對流使用時,也能遊刃有餘。