Node中stream的深度感知

背景

以前在開發ASP.NET的時候,根據源代碼依次追蹤整個程序的執行過程,發現最重要的過程是基於一個管道流的,像水管同樣,依次處理管道流中的十幾個事件,當時對流的認知是四個字,依次執行。那麼如今作Node的開發,對於Node中的流是另四個字,那就是源源不斷。本篇文章主要目的是帶你們手寫可讀流與可寫流。html

簡介

在Node中,請求流,響應流,文件流等都是基於stream模塊封裝的。簡單的理解,流就是將大塊的東西,分小塊依次處理。就像你須要10kg的水,水管就一點點的源源不斷的流出來給你。又如在程序中node

fs.readFileSync('/demo.txt', {encoding:'utf8'});fs.writeFileSync('/demo.txt', data);
複製代碼

以上兩個方法是把文件內容所有讀入內存,而後再寫入文件,可是若是文件過大就會出現問題了,內存容易爆掉。這裏就須要用到流了,一點點的讀取或者寫入。api

分類

  • Readable - 可讀的流 (例如 fs.createReadStream()).
  • Writable - 可寫的流 (例如 fs.createWriteStream()).
  • Duplex - 可讀寫的流 (例如 net.Socket).
  • Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).

Readable - 可讀的流介紹與實現

可讀流分爲兩種模式:flowing 和 paused,而且兩種模式能夠相互轉換數組

1.在 flowing 模式下, 可讀流自動從系統底層讀取數據,並經過 EventEmitter 接口的事件儘快將數據提供給應用。緩存

2.在 paused 模式下,必須顯式調用 stream.read() 方法來從流中讀取數據片斷。bash

3.全部初始工做模式爲 paused 的 Readable 流,能夠經過下面三種途徑切換到 flowing 模式:異步

  • 監聽 'data' 事件
  • 調用 stream.resume() 方法
  • 調用 stream.pipe() 方法將數據發送到 Writable

4.可讀流能夠經過下面途徑切換到 paused 模式:函數

  • 若是不存在管道目標(pipe destination),能夠經過調用 stream.pause() 方法實現。
  • 若是存在管道目標,能夠經過取消 'data' 事件監聽,並調用 stream.unpipe() 方法移除全部管道目標來實現。

5.爲了便於理解,這裏分開寫兩種模式,下面爲flowing模式基本實現ui

流程圖:this

  • 定義類首先要繼承自EventEmitter,由於要發射監聽事件,在構造函數中依次定義各個參數,其中須要說明的是this.flowing 用於切換模式,this.buffer並不是是緩存,由於流動模式是默認不走緩存的,這個buffer是讀取的時候fs.read的一個參數,全部的事件監聽到都會執行newListener。當該類被構造的時候就打開文件,並監聽事件,若是是data,默認走流動模式開始讀取。
class ReadStream extends EventEmitter{
    constructor(path,options){
        super(path,options);
        this.path = path;//寫入路徑
        this.flags = options.flags || 'r'; //操做修飾符
        this.mode = options.mode || 0o666; //權限
        this.autoClose = options.autoClose;//是否自動關閉
        this.highWaterMark = options.highWaterMark || 64 * 1024; //默認64k
        this.pos = this.start = options.start || 0;//起始位置
        this.end = options.end;//結束位置
        this.encoding = options.encoding;//編碼
        this.flowing = null;//流動模式
        this.buffer = Buffer.alloc(this.highWaterMark);//讀取的buffer 不是緩存
        this.open();
        this.on('newListener',(type,listener)=>{
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
        })
    }
複製代碼
  • open方法 打開傳入路徑的文件 若是出錯而且設置自動關閉屬性,直接關閉打開,發射error事件。若是成功了,發射open事件,供讀取方法接收。
open() {
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
           if(err){
               if(this.autoClose){
                   this.destroy();
                   return this.emit('error',err);
               }
           }
           this.fd = fd;
           this.emit('open');
        })
    }
複製代碼
  • 最重要的方法Read,一進入就須要判斷文件是否已經打開了,由於文件打開是異步的一個過程,此時可能並未打開,若是沒有打開就監聽發射的open事件,而後在回調函數中進行read方法的調用。其中核心是每次須要讀多少,這個量由傳入的開始結束位置已經最高水位線決定的。最高水位線表明依次最多讀取多少,默認值是64kb。那麼每次讀取的值 howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
read(){
        if(typeof this.fd != 'number'){
            return this.once('open',()=>this.read());
        }
        let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是實際讀到的字節數
            if(err){
                if(this.autoClose)
                    this.destroy();
                return this.emit('error',err);
            }
            if(bytes){
                let data = this.buffer.slice(0,bytes);
                this.pos += bytes;
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                if(this.end && this.pos > this.end){
                   return this.endFn();
                }else{
                    if(this.flowing)
                      this.read();
                }
            }else{
                return this.endFn();
            }

        })
    }
複製代碼
  • pipe方法實現 pipe方法就是邊讀取邊寫入,控制讀寫速度,當可寫流的寫入返回false時,暫停讀取,當可寫流觸發drain事件後,恢復讀取。
pipe(ws){
        this.on('data',data =>{
            let flag = ws.write(data);
            if (!flag) {
                this.pause();
            }
        })
        ws.on('drain',()=>{
            this.resume();
        })
    }
    pause(){
        this.flowing  = false;
    }
    resume(){
        this.flowing  = true;
        this.read();
    }
複製代碼
  1. 暫停模式 暫停模式不一樣的是須要走緩存,而且監聽的是readable事件(這裏我只貼出具備差別性的代碼) 流程圖

  • 在構造函數中須要加入this.buffers = [];(源碼中爲了提升效率,使用的是鏈表結構,這裏我用數組代替),以及readable事件的監聽。
this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
複製代碼
  • 這裏read方法須要傳入一個n,表示須要讀取的字節數。若是判斷緩存的大小,即this.length,若是this.length == 0 || this.length < this.highWaterMark ,執行_read()方法,此時執行的n爲0
let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
複製代碼
  • 若是傳入的n值 0 < n < this.length,走如下邏輯,即從緩存區中讀取相應的字節數進行讀取
if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

複製代碼

Writable - 可讀的流介紹與實現

流程圖

  • 構造函數跟上面差距不大,有一個this.buffer的緩存區,而且最高水位線默認爲16k
//構造函數
    constructor(path,options){
        super(path,options);
        this.path = path; //寫入路徑
        this.flags = options.flags || 'w';//操做修飾符
        this.mode = options.mode || 0o666;//權限
        this.start = options.start || 0;//寫入的起始位置
        this.pos = this.start;//文件的寫入索引
        this.encoding = options.encoding || 'utf8';//編碼
        this.autoClose = options.autoClose;//自動關閉
        this.highWaterMark = options.highWaterMark || 16 * 1024; //默認最高水位線16k
        this.buffers = [];//緩存區 源碼裏面是鏈表
        this.writing = false;//標識內部正在寫入數據
        this.length = 0;//標識緩存區字節的長度
        this.open();//默認一建立就打開文件
    }
複製代碼
  • open即打開文件是同樣的,這裏再也不描述。最重要的write方法,該方法有一個返回值,標識緩存區的長度是否超過了最高水位線,特別注意的是若是超過了也會寫入進去的,由於會放入到緩存區中,等到drain事件觸發,再接着寫入,具體寫入是執行的_write方法,其中有一個this.writing 標識是否正在寫入,若是正在寫入,則放入緩存區中,在清空緩存區的時候依次取出寫入,即如下的clearBuffer方法,此方法中當緩存區清空了之後觸發drain事件,這裏須要特殊說明一下,若是緩存區從未滿過,是不會觸發這個事件的。
write(chunk,encoding,callback){
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk,this.encoding);//此方法只吸入buffer或者字符串
        this.length += chunk.length;//當前緩存區的長度
        if (this.writing) {//若是正在寫入數據 則須要把數據放入緩存區裏面
            this.buffers.push({
                chunk,
                encoding,
                callback
            })
        } else { //若是當前空閒 直接調用底層寫入的方法進行寫入 而且在寫完之後 清空緩存區
            this.writing = true;
            this._write(chunk,encoding,()=>{
                callback&&callback();
                this.clearBuffer();
            })
        }

        //write方法有一個返回值 表示當前緩存區是否超過了最高水位線 便是否能繼續寫入
        return this.length < this.highWaterMark;
    }

    _write(chunk,encoding,callback){
        if (typeof this.fd != 'number') { //由於是異步的 文件可能在這個時候並未打開
            return this.once('open',()=>this._write(chunk, encoding, callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
            if (err) {
                if (this.autoClose) {
                    this.destory();
                }
               return this.emit('error',err);
            }
            this.pos += bytesWritten;

            this.length -= bytesWritten;
            callback&&callback();
        })
    }
    clearBuffer(){
        let data = this.buffers.shift();
        if(data){
                this._write(data.chunk,data.encoding,()=>{
                    data.callback && data.callback();
                    this.clearBuffer();
                })
            }else{
                this.writing = false;
                //緩存區清空了 緩存區若是沒有滿過 是不會觸發這個事件的
                this.emit('drain');
            }
    }
複製代碼

參考連接

  1. Node.js API文檔
  2. 深刻理解 Node Stream 內部機制
相關文章
相關標籤/搜索