春風十里不如Node中的一股清流

一股清流

清明時節雨紛紛,果真每逢清明是會下雨的。在這個雨夾雪,不方便外出的日子,宅在家裏一塊兒來相互學習分享吧!否則還能怎樣呢!哈哈node

友情提示:本文可能會涉及到一些Api的內容,會很乏味,很枯燥,很沒勁數組

But咱們後面的精彩也會超乎你的想象,由於咱們要手寫實現一下,不親自上馬怎麼知道馬跑的有多慢呢,Hurry Up Go Go Go!緩存

用過node的朋友們都知道流的做用很是之厲害,可讀可寫,無所不能。bash

相比於fs模塊,流更適用於讀取一個大文件,一次性讀取會佔用大量內存,效率很低,而流是將數據分割成段,會一段一段的讀取,效率會很高。說了一堆,先上概念,一塊兒看看它是誰服務器

概念

  • 流是一組有序的,有起點和終點的字節數據傳輸手段
  • 它不關心文件的總體內容,只關注是否從文件中讀到了數據,以及讀到數據以後的處理
  • 流是一個抽象接口,被 Node 中的不少對象所實現。好比HTTP 服務器request和response對象都是流

node中不少內容都應用到了流,好比http模塊的req就是可讀流,res是可寫流,而socket是可讀可寫流,看起來屌屌的,那麼咱們今天就都不講他們,只來說一下可讀流和可寫流這對兄弟異步

可讀流和可寫流對文件的操做用的也是fs模塊socket

那麼讓咱們從可讀流講起,先來看下都有哪些方法(Api)性能

可讀流

首先要會用,重點在會用

建立可讀流

const fs = require('fs');   // 引入fs核心模塊

// fs.createReadStream(path, options)
// 返回的是一個可讀流對象
let rs = fs.createReadStream('1.txt', {
    flags: 'r',         // 文件的讀取操做,默認是'r':讀取
    encoding: 'utf8',   // 設置編碼格式,默認是null, null表明的是buffer
    autoClose: true,    // 讀取完畢後自動關閉
    highWaterMark: 3,   // 默認是讀取64k    64 * 1024字節
    start: 0,
    end: 3              // 文件結束位置索引,和正常的截取slice有所不一樣,包前又包後(包括本身結束的位置)
});

// 默認狀況下,不會將文件中的內容輸出
// 內部會先建立一個buffer先讀取3字節

// 1.txt文件內容爲 123456789
複製代碼

以上代碼寫了如何建立可讀流,看起來要記那麼多options項,真是頭疼,其實通常狀況下,配置項是不用咱們寫的,這下你們知足了吧學習

知道了如何建立,咱們就看看rs這個可讀流對象上有哪些監聽事件啊測試

監聽data事件

可讀流這種模式它默認狀況下是非流動模式(暫停模式),它什麼也不作,就在這等着

你們知道流是基於事件的,因此咱們能夠去監聽事件,監聽了data事件的話,就能夠將非流動模式轉換爲流動模式

// 流動模式會瘋狂的觸發data事件,直到讀取完畢
// 根據上面設置的highWaterMark一次讀3個字節
rs.on('data', data => { // 非流動模式 -> 流動模式
    console.log(data);  // 觸發2次data事件, 分別打出123和4  從0到3共4個(包括末尾)
});

// 題外話:
// 監聽data事件的時候,若是沒有設置編碼格式,data返回的是buffer類型
// so咱們能夠爲data設置encoding爲utf8
rs.setEncoding('utf8');     // 等同於options裏的encoding: 'utf8'
複製代碼

當咱們把想要讀取的內容都讀完後,還能夠監聽一個end事件,去判斷什麼時候讀完

監聽end事件

rs.on('end', () => {
    console.log('完畢了'); 
});

// 此時除了會打印data事件裏的123, 4以外還會打印 完畢了
// 以下表示:
// 123
// 4
// 完畢了
複製代碼

除了data和end兩個事件以外,可讀流中還能夠監聽到error、open以及close事件,因爲用處沒有前兩位大,就委屈一下放在一塊兒寫吧

監聽error/open/close事件

// error
rs.on('error', err => {
    console.log(err);
});
// open
rs.on('open', () => {
    console.log('文件打開了');
});
// close
rs.on('close', () => {
    console.log('文件關閉了');
});

// 根據上面監聽data、end事件,下面打印的內容是
/*
*   文件打開了
    123
    4
    end
    文件關閉了
* */
複製代碼

各種監聽事件都知道怎麼寫了,最後再看兩個方法,他們是pause和resume,暫停和恢復觸發data

暫停和恢復

// pause
rs.on('data', data => { 
    console.log(data);  // 只會讀取一次就暫停了,此時只讀到了123
    rs.pause();     // 暫停讀取,會暫停data事件觸發
});
// resume
setInterval(() => {
    rs.resume();    // 恢復data事件, 繼續讀取,變爲流動模式
                    // 恢復data事件後,還會調用rs.pause,要想再繼續觸發,把setTimeout換成setInterval持續觸發
}, 3000);
// 打印以下:
/*
*   文件打開了
    123
    4   // 隔了3秒後打印
    end
    文件關閉了
* */
複製代碼

說完了可讀流的用法,讓咱們再接再礪(不得不)去看下它的兄弟可寫流吧,畢竟對於做爲世界第一大羣體的程序猿來講,總得有個從入門到精通(放棄)的深層次提高嘛!加了個油的,各位走起。

可寫流

廢話很少說,上來就是幹

建立可寫流

const fs = require('fs');
// fs.createWriteStream(path, options);
const ws = fs.createWriteStream('2.txt', {
    flags: 'w',         // 文件的操做, 'w'寫入文件,不存在則建立
    mode: 0o666,
    autoClose: true,
    highWaterMark: 3,   // 默認寫是16k
    encoding: 'utf8'
});
複製代碼

可寫流就有兩個方法,分別是write和end方法,直接看下如何使用

write方法

// ws.write(chunk, encoding(可選), callback);
// 寫入的chunk數據必須是字符串或者buffer
let flag = ws.write('1', 'utf8', () => {});     // 異步的方法 有返回值

console.log(flag);  // true
flag = ws.write('22', 'utf8', () => {});    
console.log(flag);  // false    超過了highWaterMark的3個字節,不能再寫了
flag = ws.write('3', 'utf8', () => {}); 
console.log(flag);  // false

// 2.txt -> 寫入了 1223
複製代碼

flag標識符表示的並非是否寫入,而是可否繼續寫,true爲能夠繼續寫入。可是返回false,也不會丟失,還會寫到文件內的

接下來再介紹下end方法

end方法

// 能夠傳入chunk值
ws.end('完畢');   // 當寫完後 就不能再寫了

// 此時2.txt -> 寫入了 1223完畢
複製代碼

講完了write和end方法,可寫流還有一個on監聽事件,它能夠監聽drain(抽乾)事件

監聽drain事件

// drain方法
// 抽乾方法 當都寫入後觸發drain事件
ws.on('drain', () => {
    console.log('已經抽乾了');
});
複製代碼

重頭戲來了

  • 前面羅裏吧嗦都在寫如何使用,Api着實讓你們看的昏昏欲睡了。
  • 可是各位觀衆,如今纔是最最值得高興的時刻,對於流的操做,咱們不只僅要會用,還應該簡單的去實現一下。
  • 這樣才能知足咱們龐大的求知慾而且get到新技能,老樣子,直接上代碼,從代碼中去深刻分析一番
  • 若是讀的疲憊了,那就歇歇吧,當一個佛系青年,看空一切也是一種痛的領悟啊

實現可讀流

先來個栗子

// demo.js
const ReadStream = require('./ReadStream'); // 引入實現的可讀流

const rs = new ReadStream('1.txt', {
    flags: 'r',
    // encoding: 'utf8',
    autoClose: true,
    highWaterMark: 3,
    start: 0,
    end: 4
});

rs.on('data', data => {
    console.log(data);
    rs.pause();
});

rs.on('end', () => {
   console.log('end');
});

setTimeout(() => {
    rs.resume();
}, 2000);
複製代碼

前方高能,開啓敲擊模式,若是還不知道node中的buffer和events的話,千萬別捉急。你們都是一條船上的人,我會在以後的文章裏給你們分享,且先暫且繼續看下去啊!堅持住,兄弟姐妹們!

建立ReadStream類
// ReadStream.js
const fs = require('fs');
const EventEmitter = require('events');  // 須要依賴事件發射
// 這裏用ES6提供的class寫法,你們也一塊兒來看看是怎麼寫的吧

class ReadStream extends EventEmitter {
    constructor(path, options) {    // 須要傳入path和options配置項
        super();    // 繼承
        this.path = path;
        // 參照上面new出的實例,咱們開始寫
        this.flags = options.flags || 'r';  // 文件打開的操做,默認是'r'讀取
        this.encoding = options.encoding || null;   // 讀取文件編碼格式,null爲buffer類型
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark || 64 * 1024;  // 默認是讀取64k
        this.start = options.start || 0;
        this.end = options.end;
        
        this.flowing = null;   // null表示非流動模式
        // 要創建一個buffer,這個buffer就是一次要讀多少內容
        // Buffer.alloc(length)  是經過長度來建立buffer,這裏每次讀取建立highWaterMark個
        this.buffer = Buffer.alloc(this.highWaterMark);  
        this.pos = this.start;  // 記錄讀取的位置
        
        this.open();    // 打開文件,獲取fd文件描述符
        
        // 看是否監聽了data事件,若是監聽了,就變成流動模式
        this.on('newListener', (eventName, callback) => {
            if (eventName === 'data') {   // 至關於用戶監聽了data事件
                this.flowing = true;  // 此時監聽了data會瘋狂的觸發
                this.read();    // 監聽了,就去讀,要乾脆,別猶豫
            }
        });
    }
}

module.exports = ReadStream;    // 導出
複製代碼

寫到這裏咱們已經建立好了ReadStream類,在該類中咱們繼承了EventEmitter事件發射的方法

其中咱們寫了open和read這兩個方法,從字面意思就明白了,咱們的可讀流要想讀文件,the first就須要先打開(open),after咱們再去讀內容(read)

這就是實習可讀流的主要方法,咱們接下來先從open方法寫起

open方法
class ReadStream extends EventEmitter {
    constructor(path, options) {
        // 省略...
    }
    
    open() {
        // 用法: fs.open(filename,flags,[mode],callback)
        fs.open(this.path, this.flags, (err, fd) => {   // fd爲文件描述符
            // 說實在的咱們打開文件,主要就是爲了獲取fd
            // fd是個從3開始的數字,每打開一次都會累加,4->5->6...
            if (err) {
                if (this.autoClose) {  // 文件打開報錯了,是否自動關閉掉
                    this.destory();    // 銷燬    
                }
                this.emit('error', err);    // 發射error事件
                return;
            }
            this.fd = fd;   // 若是沒有錯,保存文件描述符
            this.emit('open');  // 發射open事件
        });
    }
    
    // 這裏用到了一個destory銷燬方法,咱們也直接實現了吧
    destory() {
        // 先判斷有沒有fd 有就關閉文件 觸發close事件
        if (typeof this.fd === 'number') {
            // 用法: fs.close(fd,[callback])
            fs.close(this.fd, () => {
                this.emit('close'); 
            });
            return;
        }
        this.emit('close');
    }
}
複製代碼

萬事開頭難,咱們把第一步打開文件搞定了,那麼就剩下讀取了,再接再礪當上王者

read方法
class ReadStream extends EventEmitter {
    constructor(path, options) {
        // 省略...
    }
    // 監聽data事件的時候,去讀取
    read() {
        console.log(this.fd);   // 直接讀fd爲undefined,由於open事件是異步的,此時還拿不到fd
        // 此時文件還沒打開
        if (typeof this.fd !== 'number') {  // 前面說過fd是個數字
            // 當文件真正打開的時候,會觸發open事件
            // 觸發事件後再執行read方法,此時fd確定有了
            return this.once('open', () => this.read());  // once方法只會執行一次
        }
        // 如今有fd了,大聲的讀出來,不要害羞
        // 用法: fs.read(fd, buffer, offset, length, pos, callback((err, bytesRead)))
        
        // length就是一次想讀幾個, 不能大於buffer長度
        // 這裏length不能等於highWaterMark,舉個🌰
        // 文件內容是12345若是按照highWaterMark:3來讀,總共讀end:4個,每次讀3個字節
        // 分別是123 45空,咱們應該知道一共要讀幾個,總數-讀取位置+1獲得下一次要讀多少個
        // 這裏有點繞,你們能夠多去試試體會一下
        // 咱們根據源碼起一個一樣的名字
        let howMuchToRead = this.end ? Math.min((this.end-this.pos+1), this.highWaterMark) : this.highWaterMark;
        
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => {
            // bytesRead爲讀取到的個數,每次讀3個,bytesRead就是3
            if (bytesRead > 0) {
                this.pos += bytesRead; // 讀到了多少個,累加,下次從該位置繼續讀
                
                let buf = this.buffer.slice(0, bytesRead);  // 截取buffer對應的值
                // 其實正常狀況下,咱們只要把buf當成data傳過去便可了
                // 可是考慮到還有編碼的問題,因此有可能不是buffer類型的編碼
                // 這裏須要判斷一下是否有encoding
                let data = this.encoding ? buf.toString(this.encoding) : buf.toString(); 
                
                this.emit('data', data);    // 發射data事件,並把data傳過去
                
                // 若是讀取的位置 大於 結束位置 就表明讀完了,觸發一個end事件
                if (this.pos > this.end) {
                    this.emit('end');
                    this.destory();
                }
                // 流動模式繼續觸發
                if (this.flowing) {   
                    this.read();
                }
            } else {    // 若是bytesRead沒有值了就說明讀完了
                this.emit('end');   // 發射end事件,表示文件讀完
                this.destory();     // 沒有價值了,kill
            }
        });
    }
}
複製代碼

以上就是read方法的主要實現了,其實思路上並非很難,去除掉註釋的話也會更精簡些。你們上面也瞭解了可讀流的用法,知道它還有兩個方法,那就是pause(暫停)和resumt(恢復),那麼咱們擇日不如撞日直接寫完吧,簡單到使人髮指的實現,看看不會吃虧的,哈哈

pause和resume方法
class ReadStream extends EventEmitter {
    constructor(path, options) {
        // 省略...
    }
    pause() {
        this.flowing = false;
    }
    resume() {
        this.flowing = true;
        this.read();
    }
}
複製代碼

完事,就是這麼so easy,咱們實現了本身的可讀流了,可喜可賀,可喜可賀。

實現可寫流

先看下測試數據

let WriteStream = require('./WriteStream'); // 引入咱們實現的可寫流

let ws = new WriteStream('3.txt', {
    flags: 'w',
    highWaterMark: 3,
    autoClose: true,
    encoding: 'utf8',
    mode: 0o666,
    start: 0
});

// ws.write('你d好', 'utf8', () => {});

let i = 9;

function write() {
    let flag = true;
    while (i >= 0 && flag) {
        flag = ws.write(--i + '', 'utf8', () => {});
        console.log(flag);
    }
}

write();
// drain只有當緩存區充滿後 而且被消費後出發
ws.on('drain', () => {
    console.log('抽乾');
    write();
});
複製代碼

可寫流的實現前面部分和可讀流基本一致,不過可寫流是有drain(抽乾)事件的,因此在編寫的時候也會對這一點進行處理的

建立可寫流
let fs = require('fs');
let EventEmitter = require('events');   // 須要事件發射

// 繼承事件發射EventEmitter
class WriteStream extends EventEmitter {
    constructor(path, options) {
        super();    // 繼承
        this.path = path;
        this.highWaterMark = options.highWaterMark || 16 * 1024;    // 默認一次寫入16k
        this.autoClose = options.autoClose || true;
        this.encoding = options.encoding || null;
        this.mode = options.mode;
        this.start = options.start || 0;
        this.flags = options.flags || 'w';  // 默認'w'爲寫入操做
        
        this.buffers = [];
        this.writing = false;   // 標識 是否正在寫入
        this.needDrain = false;     // 是否知足觸發drain事件
        this.pos = 0;   // 記錄寫入的位置
        this.length = 0;
        
        this.open();    // 首先仍是打開文件獲取到fd文件描述符
    }
}

module.exports = WriteStream;
複製代碼
  • 可寫流要有一個緩存區,當正在寫入文件時,內容要寫入到緩存區裏,在源碼中是一個鏈表 => 咱們就直接用個[]來實現,這就是this.buffers的做用

  • 再有就是用buffers計算的話,每增長一項都須要遍歷一遍,維護起來性能過高了,因此用this.length來記錄緩存區的大小

下面咱們直接寫open方法打開文件拿到fd文件描述符

open方法
class WriteStream extends EventEmitter {
    constructor(path, options) {
        // 省略...
        this.open();
    }
    
    open() {
        // 用法: fs.open(filename,flags,[mode],callback)
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                this.emit('error', err);
                // 看一下是否會自動關閉
                if (this.autoClose) {
                    this.destory();     // 銷燬
                }
                return;
            }
            this.fd = fd;
            this.emit('open');  // 觸發open事件,表示當前文件打開了
        });
    }
    
    destory() {
        if (typeof this.fd !== 'number') {  // 若是不是fd的話直接返回一個close事件
            return this.emit('close');
        }
        fs.close(this.fd, () => {
            this.emit('close');
        });
    }
}
複製代碼

經過open方法獲取到了fd文件描述符後,對於流來講就成功了一半。下面乘勝追擊,直搗黃龍完成可寫流的兩個方法吧!!!

write和end方法
class WriteStream extends EventEmitter {
    constructor(path, options) {
        // 省略...
    }
    // 用法:ws.write(chunk,[encoding],[callback])
    write(chunk, encoding = this.encoding, callback) {
        // 經過fs.write()寫入時,chunk須要改爲buffer類型
        // 而且要用咱們指定的編碼格式去轉換
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
        // chunk.length就是要寫入的長度
        this.length += chunk.length;
        // 比較是否達到了緩存區的大小
        let result = this.length < this.highWaterMark;
        this.needDrain = !result;   // 是否須要觸發drain事件
        
        if (this.writing) {
            this.buffers.push({
                chunk,
                encoding,
                callback
            });
        } else {
            this.writing = true;
            this._write(chunk, encoding, () => {
                callback();
                this.clearBuffer();
            });
        }
        
        return result;  // write方法 返回一個布爾值
    }
    _write(chunk, encoding, callback) {
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this._write(chunk, encoding, callback));
        }
        // fs.write寫入文件
        // 用法: fs.write(fd, buffer[, offset[, length[, position]]], callback)
        
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
            // this.length記錄緩存區大小,寫入後length須要再減掉寫入的個數
            this.length -= bytesWritten;    
            // this.pos下次寫入的位置
            this.pos += bytesWritten;
            this.writing = false;
            callback();     // 清空緩存區內容
        });
    }
    
    clearBuffer() {
        let buf = this.buffers.shift(); // 每次把最早放入緩存區的取出
        if (buf) {  // 若是有值,接着寫
            this._write(buf.chunk, buf.encoding, () => {
                buf.callback();
                this.clearBuffer(); // 每次寫完都清空一次緩存區
            });
        } else {    // 緩存區已經空了
            if (this.needDrain) {   // 是否須要觸發drain 須要就發射drain事件
                this.needDrain = false;
                this.emit('drain');
            }
        }
    }
    
    end() {
        if (this.autoClose) {
            this.emit('end');
            this.destory();
        }
    }
}
複製代碼
  • 以上就完成了可寫流的實現了,各位可能會有一些疑惑,在此我先把廣泛的疑惑說一下吧
  • write方法裏的條件判斷先解釋一下
    • if條件
      • 若是是正在寫入,就先把內容放到緩存區裏,就是this.buffers裏
      • 給數組裏存入一個對象,分別對應chunk, encoding, callback
      • 這樣方便在清空緩存區的時候取緩存區裏對應的內容
    • else條件
      • 專門用來將內容,寫入到文件內
      • 每一次寫完後都須要把buffers(緩存區)裏的內容清空掉
      • 當緩存區buffers數組裏是空的時候就會觸發drain事件了
  • _write方法裏typeof那裏的判斷來講明一下
    • 判斷是否有fd文件描述符,只有在打開文件成功的時候纔會有fd
    • 因此若是沒有的話,須要觸發一次open事件,拿到fd都再調_write方法
  • end方法就比較簡單了
    • 判斷是否會自動關閉,發射end事件並銷燬便可了

寫在最後

終於都搞定了,其實說實話,這些基於Api的東西提及來仍是很讓人枯燥無聊的,你們都是拒絕無聊主義者。但我仍是堅持寫下來了,也是想讓你們和我一塊兒去感覺一下大師們是怎麼實現怎麼思考的過程

謝謝你們的觀看了,能堅持下來的都不是折翼的天使啊!

相關文章
相關標籤/搜索