解讀 Node 核心模塊 Stream 系列一( Readable )

node中的流

node中stream模塊是很是,很是,很是重要的一個模塊,由於不少模塊都是這個模塊封裝的:node

  • Readable:可讀流,用來讀取數據,好比 fs.createReadStream()。
  • Writable:可寫流,用來寫數據,好比 fs.createWriteStream()。
  • Duplex:雙工流,可讀+可寫,好比 net.Socket()。
  • Transform:轉換流,在讀寫的過程當中,能夠對數據進行修改,好比 zlib.createDeflate()(數據壓縮/解壓)。

系列連接

node中流的實現:

node中stream是一個類,它繼承自Event模塊,因此能夠經過事件訂閱的方式來修改內部的狀態或者調用外部的回調,咱們能夠從源碼node/lib/internal/streams/legacy.js看到:

node/lib/internal/streams/legacy.js

node中stream(node/lib/stream.js)包括了主要包括了四個部分:

  • lib/_stream_readable.js
  • lib/_stream_writable.js
  • lib/_stream_tranform.js
  • lib/_stream_duplex.js

stream模塊

Readble

Readble的例子

  • 客戶端上的 HTTP 響應
  • 服務器上的 HTTP 請求
  • fs 讀取的流
  • zlib 流
  • crypto 流
  • TCP socket
  • 子進程 stdout 與 stderr
  • process.stdin

Readable的特色和簡化實現:

特色

  1. Readable擁有一個經過BufferList生成的緩存鏈表buffer,用來緩存讀取到的chunk(對於非對象模式的流,數據塊能夠是字符串或 Buffer。對於對象模式的流,數據塊但是除 null 之外的任意 JavaScript 值),同時有一個length來記錄buffer的長度
  2. Readable擁有一個highWaterMark來標明buffer的最大容量,經過和length比較決定是否須要補充緩存
  3. Readable訂閱'readble'事件來觸發read()消費者從緩存中消耗數據
  4. Readable擁有read()從緩存區讀取數據的同時也會根據標誌判斷是否調用生產者補充緩存區
  5. Readable擁有reading來標明消費者正在消耗
  6. Readable擁有howMatchToRead()來隨時調整讀取的大小,防止對buffer過多的讀取,致使會讀取亂碼的部分
  7. Readable擁有fromList()來根據讀取大小的不一樣,隨時調整buffer中的鏈表結構

因爲node原碼的可讀流有將近一千行的代碼,其中有大量的異常處理,debug調試,各類可讀流的兼容處理,加碼解碼處理等,因此這裏採起一個簡化版的實現,源碼中使用鏈表做爲buffer,這裏採用數組進行簡化,主要是闡述可讀流的處理過程。web

構造函數

  1. Readable擁有一個經過BufferList生成的緩存鏈表buffer,用來緩存讀取到的chunk(對於非對象模式的流,數據塊能夠是字符串或 Buffer。對於對象模式的流,數據塊但是除 null 之外的任意 JavaScript 值),同時有一個length來記錄buffer的長度
  2. Readable擁有一個highWaterMark來標明buffer的最大容量,經過和length比較決定是否須要補充緩存
  3. Readable訂閱'readble'事件來觸發read()消費者從緩存中消耗數據
const EE = require('events');
const util = require('util');
const fs = require('fs');

function Readable(path,options) {//這個參數是源碼沒有的,這裏主要是爲了讀取fs爲案例加的
    EE.call(this);//構造函數繼承EventEmiter
    
    this.path = path;
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark || 64 * 1024;//64k
    this.encoding = options.encoding || null;
    this.flags = options.flags || 'r';//// 這個源碼沒有的,這裏主要是爲了fs讀取案例加的
    this.needEmitReadable = false;// 須要觸發readable事件,默認不須要
    this.position = 0;// 偏移量
    this.cache = []; // 緩存區
    this.reading = false;// 是否正在從緩存中讀取,消費者消耗中
    this.length = 0; // 緩存區大小,控制長度
    this.open(); // 這個源碼沒有的,這裏主要是爲了fs讀取案例加的
    this.on('newListener', (type) => {
        if (type === 'readable') { // 看一下是不是'readable'事件模式
            this.read();//消耗者,從buffer讀取數據
        }
    })
}
util.inherits(Readable, EE);//原型繼承EventEmiter
複製代碼

下面這個函數在Readable沒有,可是在ReadStream中存在,這裏爲了利用fs讀取操做說明流,簡化實現版本上添加了這個方法,後面說明ReadStream模塊和Readable的繼承關係數組

Readable.prototype.open = function(){//這裏是異步打開的操做
    fs.open(this.path, this.flags, (err, fd) => {
        if (err) { // 銷燬文件
            if (this.autoClose) { // 若是須要自動關閉觸發一下銷燬事件
                this.destroy(); // 它銷燬文件
            }
            return this.emit('error', err);
        }
        this.fd = fd;
        this.emit('open', fd);
    });
}
//源碼中的destory不是這樣的,這裏只是ReadStream中的destory,源碼中作了各類可讀流的兼容組合處理
Readable.prototype.destroy = function() {
    if (typeof this.fd != 'number') {
        this.emit('close');
    } else {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }
}
複製代碼

read和_read

  1. Readable擁有read()從緩存區讀取數據的同時也會根據標誌判斷是否調用生產者補充緩存區
  2. Readable擁有reading來標明消費者正在消耗
  3. Readable擁有howMatchToRead()來隨時調整讀取的大小,防止對buffer過多的讀取,致使會讀取亂碼的部分
  4. Readable擁有fromList()來根據讀取大小的不一樣,隨時調整buffer中的鏈表結構
Readable.prototype.read = function(n) {
    let buffer = null;
    if(n>this.len){// 若是緩存區中有數據不夠此次讀取,則調整highWaterMark而且補充緩存區
        this.highWaterMark = computeNewHighWaterMark(n);//從新計算調整內存2的次方 exp: 5 => 8
        this.needEmitReadable = true;
        this.reading = true;
        this._read();
    }
    if (n > 0 && n < this.len) { // 若是緩存區中有數據夠此次讀取,則從緩存區中讀取
        buffer = Buffer.alloc(n);
        let current;
        let index = 0;
        let flag = true;
        //這裏的代碼就是源碼中fromList()的功能,對buffer進行調整,exp:123 456 789讀取12 => 3 456 789
        while (flag && (current = this.cache.shift())) {//current是一個buffer
            for (let i = 0; i < current.length; i++) {
            buffer[index++] = current[i];//將緩存區中的chunk內容copy到buffer中
            if (index == n) {//n個數據讀取完畢
                flag = false;
                this.length -= n; //緩存區長度更新
                let c = current.slice(i + 1);//獲取完的chunk exp:123 => 3
                    if (c.length) { 
                        this.cache.unshift(c);//將沒有取完的chunk放回 exp: 3
                    }
                    break;
                }
            }
        }
    }
    if(this.length === 0){//緩存中沒有數據
        this.needEmitReadable = true; 須要觸發'readable'
    }
    if (this.length < this.highWaterMark) {//緩存區沒有滿,補充緩存區
        this.reading = true;
        this._read();
    }
    return buffer;//read()返回值爲一個buffer
}
//第一次讀取是內置的自動讀取到緩存區
//而後觸發readable是從緩存區中讀取消耗的同時,而且也會補充緩存區
Readable.prototype._read = function(n) {
    if (typeof this.fd !== 'number') {
        return this.once('open', () => this._read());//由於fs.open是異步函數,當執行read必需要在open以後
    }
    let buffer = Buffer.alloc(this.highWaterMark);
    
    //源碼中經過Readable.prototype.push()調用readableAddChunk()再調用addChunk()
    //這裏經過fs.read來調用addChunk(this,bytesRead)
    fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
       addChunk(this,bytesRead);
    })
}

//源碼中經過Readable.prototype.push()調用readableAddChunk()再調用addChunk()
function addChunk(stream, chunk) {
        stream.length += bytesRead; // 增長緩存的個數
        stream.position += bytesRead;//記錄文件讀取的位置
        stream.reading = false;
        stream.cache.push(buffer);//數據放入到緩存中
        if (stream.needEmitReadable) {
            stream.needEmitReadable = false;
            stream.emit('readable');
        }
}

//源碼中這個函數是經過howMatchToRead()調用的,由於howMatchToRead()在其餘的流中也會用到,因此兼容了其餘狀況
function computeNewHighWaterMark(n) {
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
    return n;
}
複製代碼

實例

const fs = require('fs');
const rs = fs.createReadStream('./1.txt',{//123456789
  flags:'r',
  autoClose:true,
  highWaterMark:3,
  encoding:null
});
rs.on('readable',function () { 
  // 若是緩存區沒這麼大會返回null
  let r = rs.read(1);
  console.log(r);
  console.log(rs._readableState.length);
  rs.read(1);
  setTimeout(() => {//由於補充是異步的
  console.log(rs._readableState.length);
  }, 1000);
});
複製代碼

ReadStream

ReadStream和Readable的關係

ReadStream實際上是Readable的子類,它繼承了Readable,以fs.createReadStream爲例(node/lib/internal/fs/streams.js): 緩存

fs/streams
而後對上面的_read方法進行了覆蓋可是其中調用了Readable.prototype.push()方法:
fs/streams.read
而且在其上擴展了open和close:
fs/streams.read
fs/streams.read

ReadStream的特色和簡化實現:

特色

  1. ReadStream擁有一個highWaterMark來標明讀取數據的大小
  2. ReadStream訂閱'data'事件來觸發read()消費者讀取數據
  3. ReadStream擁有paused 模式和flowing 模式,它們經過flowing標誌進行控制:
  • readable.readableFlowing = null,沒有提供消費流數據的機制,因此流不會產生數據。
  • readable.readableFlowing = true,監聽'data'事件、調用readable.pipe()方法、或調用readable.resume()方法,會變成true可讀流開始主動地產生數據觸發事件。
  • readable.readableFlowing = false,調用readable.pause()、readable.unpipe()、或接收背壓,會被設爲false,暫時中止事件流動但不會中止數據的生成。
  1. Readable擁有read()讀取數據
  2. ReadStream擁有howMatchToRead來隨時調整讀取的大小,防止讀取亂碼

簡化實現

const EE = require('events');
const util = require('util');
const fs = require('fs');
function ReadStream (path,options) {
    this.path = path;
    this.flags = options.flags || 'r'; //用來標識打開文件的模式
    this.encoding = options.encoding || null;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.start = options.start || 0; //讀取(文件)的開始位置
    this.end = options.end || null; //讀取(文件)的結束位置
    this.autoClose = options.autoClose || true;
    this.flowing = null; // 默認非流動模式
    this.position = this.start // 記錄讀取數據的位置
    this.open(); // 打開文夾
    this.on('newListener', function (type) {
        if (type === 'data') { // 用戶監聽了data事件
            this.flowing = true;
            this.read();
        }
    })
}
ReadStream.prototype.read = function (){
    if (typeof this.fd !== 'number') {// open操做是異步的,因此必須等待文件打開this.fd存在說明打開文件
        return this.once('open', () => this.read());
    }
    let buffer = Buffer.alloc(this.highWaterMark); // 把數據讀取到這個buffer中
    //判斷每次讀取的數據是多少exp:數據源1234567890 highWaterMark=3
    //最後一次讀取長度爲1
    let howMuchToRead = Math.min(this.end - this.pos + 1, this.highWaterMark);
    fs.read(this.fd, buffer, 0, howMuchToRead, this.position, (err, byteRead) => {
    if (byteRead > 0) {
        this.emit('data', buffer.slice(0, byteRead));
        this.position += byteRead;//更新讀取的起點
        if (this.flowing) {//處在flowing模式中就一直讀
            this.read();
        }
    }else{//讀取完畢
        this.flowing = null;
        this.emit('end');
        if(this.autoClose){
            this.destroy();
        }
    }
}
//經過flowing控制暫停仍是繼續讀取
ReadStream.prototype.pause = function(){
    this.flowing = false;
}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
ReadStream.prototype.destroy = function () {
    if (typeof this.fd != 'number') {
        this.emit('close');
    } else {
        fs.close(this.fd, () => {
        this.emit('close');
        })
    }
};

ReadStream.prototype.open = function() {
    fs.open(this.path, this.flags, (err, fd) => {// fd文件描述符 只要文件打開了就是number
        if (err) {
            if (this.autoClose) { // 若是須要自動關閉 觸發一下銷燬事件
            this.destroy(); // 銷燬文件
        }
        return this.emit('error', err);
    }
    this.fd = fd;
    this.emit('open', fd);
    });
};
複製代碼

實例

let fs = require('fs');
let ReadStream = require('./ReadStream')
let rs = fs.createReadStream('1.txt',{//1234567890
  encoding:null, 
  flags:'r+', 
  highWaterMark:3, 
  autoClose:true, 
  start:0, 
  end:3  
});
let arr = [];
rs.on('open',function () {
  console.log(' 文件開啓了')
});
rs.on('data',function (data) {
  console.log(data);
  arr.push(data);
}); 
rs.on('end',function () { // 只有目標文件讀取完畢後才觸發
  console.log('結束了');
  console.log(Buffer.concat(arr).toString());
});
rs.pause()
setTimeout(function () {
    rs.resume(); // 恢復的是data事件的觸發
},1000)
rs.on('error',function (err) {
  console.log('出錯了')
})
rs.on('close',function () {
  console.log('close')
});
複製代碼

結語:

但願這篇文章可以讓各位看官對Stream熟悉,由於這個模塊是node中的核心,不少模塊都是繼承這個模塊實現的,若是熟悉了這個模塊,對node的使用以及koa等框架的使用將大有好處,接下來會逐步介紹writable等流模式本文參考:bash

  1. 深刻理解Node Stream內部機制
  2. node API
  3. node 源碼
相關文章
相關標籤/搜索