let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3, //文件一次讀多少字節,默認 64*1024
flags:'r', //默認 'r'
autoClose:true, //默認讀取完畢後自動關閉
start:0, //讀取文件開始位置
end:3, //流是閉合區間 包含start也含end
encoding:'utf8' //默認null
});
複製代碼
rs.on("open",()=>{
console.log("文件打開")
});
複製代碼
可讀流這種模式它默認狀況下是非流動模式(暫停模式),它什麼也不作,就在這等着html
監聽了data事件的話,就能夠將非流動模式轉換爲流動模式node
流動模式會瘋狂的觸發data事件,直到讀取完畢git
直接上代碼github
//1.txt中內容爲1234567890
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3, //文件一次讀多少字節,默認 64*1024
flags:'r', //默認 'r'
autoClose:true, //默認讀取完畢後自動關閉
start:0, //讀取文件開始位置
end:3, //流是閉合區間 包含start也含end
encoding:'utf8' //默認null
});
rs.on("open",()=>{
console.log("文件打開")
});
//瘋狂觸發data事件 直到讀取完畢
rs.on('data',(data)=>{
console.log(data); //共讀4個字節,可是highWaterMark爲3,因此觸發2次data事件,分別打印123 4
});
複製代碼
rs.on("err",()=>{
console.log("發生錯誤")
});
rs.on('end',()=>{ //文件讀取完畢後觸發
console.log("讀取完畢");
});
rs.on("close",()=>{ //最後文件關閉觸發
console.log("關閉")
});
複製代碼
不要急,最後把方法介紹完統一寫個例子,你們一看便一目了之算法
終於把可讀流的全部API講完了,火燒眉毛的寫個完整的案例來體驗下,說幹就幹gulp
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter { //建立可讀流類,繼承 EventEmitter
constructor(path, options = {}) { //options默認空對象
super();
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.pos = this.start; //pos會隨着讀取的位置改變
this.end = options.end || null;
this.encoding = options.encoding || null;
this.flags = options.flags || 'r';
this.flowing = null; //非流動模式
//聲明一個buffer表示都出來的數據
this.buffer = Buffer.alloc(this.highWaterMark);
this.open(); //打開文件 fd
}
複製代碼
//打開文件用
open() {
fs.open(this.path, this.flags, (err, fd) => { //fd標識的就是當前this.path這個文件,從3開始(number類型)
if (err) {
if (this.autoClose) { //若是須要自動關閉我再去銷燬fd
this.destroy(); //關閉文件(觸發關閉事件)
}
this.emit('error', err); //打開文件發生錯誤,發佈error事件
}
this.fd = fd; //保存文件描述符
this.emit('open', this.fd) //觸發文件open方法
})
}
複製代碼
destroy() {
if (typeof this.fd != 'number') { //文件未打開,也要關閉文件且觸發close事件
return this.emit('close');
}
fs.close(this.fd, () => { //若是文件打開過了 那就關閉文件而且觸發close事件
this.emit("close");
})
}
複製代碼
read() {
//此時文件還沒打開
if (typeof this.fd != 'number') {
//當文件真正打開的時候 會觸發open事件,觸發事件後再執行read,此時fd 就有了
return this.once('open', () => this.read())
}
//此時有fd了 開始讀取文件了
//this.pos是變量,開始時this.pos = this.start,在上面定義過了
//算法有點繞,源碼中是這樣實現的。舉個例子 end=3,pos=0,highWaterMark=3, howMuchToRead = 3, 1.txt內容1234 就會讀123 4
let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, byteRead) => {
// byteRead真實讀到的個數
this.pos += byteRead;
// this.buffer默認三個
let b = this.buffer.slice(0, byteRead);
//對讀到的b進行編碼
b = this.encoding ? b.toString(this.encoding) : b;
//把讀取到的buffer發射出去
this.emit('data', b);
if ((byteRead === this.highWaterMark) && this.flowing) {
return this.read();
}
//這裏沒有更多邏輯了
if (byteRead < this.highWaterMark) {
//沒有更多了
this.emit('end'); //讀取完畢
this.destroy(); //銷燬完畢
}
})
}
複製代碼
你們會發現,此時咱們尚未監聽 rs.on('data')事件,來觸發read方法,此時咱們須要修改下 第一步建立構造函數的代碼api
constructor(path, options = {}) {
//省略.... 代碼和第一步同樣,下面是新添加
// 看是否監聽了data事件,若是監聽了就要變成流動模式
this.on('newListener', (eventName, callback) => {
if (eventName === 'data') {
//至關於用戶監聽了data事件
this.flowing = true;
// 監聽了就去讀
this.read(); //去讀內容
}
})
}
複製代碼
若是能看到這裏,就基本大功告成,就只剩下pause和resume 暫停和恢復暫停方法。那就一寫到底bash
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
//恢復暫停,在去無限讀
this.read();
}
複製代碼
終於大功告成,寫的對不對呢,趕忙測試下吧,期待的搓手手異步