node那點事(一) -- Readable streams(可讀流)

流的簡介

流(stream)在 Node.js 中是處理流數據的抽象接口(abstract interface)。 stream 模塊提供了基礎的 API 。使用這些 API 能夠很容易地來構建實現流接口的對象。api

Node.js 提供了多種流對象。 例如, HTTP 請求 和 process.stdout 就都是流的實例。緩存

流能夠是可讀的、可寫的,或是可讀寫的。全部的流都是 EventEmitter 的實例。bash

爲何要用流

這裏咱們舉一個簡單的例子:異步

咱們打算讀取一個文件,使用 fs.readFileSync 同步讀取一個文件,程序會被阻塞,全部的數據都會被讀取到內存中。函數

換用 fs.readFile 讀取文件,程序不會被阻塞,可是全部的數據依舊會被一次性所有被讀取到內存中。ui

當處理大文件壓縮、歸檔、媒體文件和巨大的日誌文件的時候,內存使用就成了問題,如今你們通常家用機內存大多數都是8G、16G,軟件包還在日益增大,在這種狀況下,流的優點就體現出來了。this

流被設計爲異步的方式,在內存中只開啓一個固定的空間,將文件化整爲零,以流動的方式進行傳輸操做,解決了以上問題。spa

流的類型

Node.js 中有四種基本的流類型:設計

Readable - 可讀的流 (例如 fs.createReadStream()).日誌

Writable - 可寫的流 (例如 fs.createWriteStream()).

Duplex - 可讀寫的流 (例如 net.Socket).

Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).

可讀流(Readable Stream)

可讀流有兩種模式:

一、流動模式(flowing):可讀流自動讀取數據,經過EventEmitter接口的事件儘快將數據提供給應用。

二、暫停模式(paused):必須顯式調用stream.read()方法來從流中讀取數據片斷。

能夠經過三種途徑切換到流動模式:

  • 監聽 'data' 事件
  • 調用 stream.resume() 方法
  • 調用 stream.pipe() 方法將數據發送到 Writable

流動模式切換到暫停模式的api有:

  • 若是不存在管道目標,調用stream.pause()方法

  • 若是存在管道目標,調用 stream.unpipe()並取消'data'事件監聽

可讀流事件:'data','readable','error','close','end'

咱們能夠想象下家用熱水器的模型,熱水器的水箱(buffer緩存區)裏面存着熱水(數據),在咱們用熱水的時候,開啓水龍頭,自來水會不斷的進入水箱,再從水箱由水龍頭流出來供咱們使用。這就是進入了「flowing」模式。當咱們關閉水龍頭時候,水箱則會暫停進水,水龍頭也會暫停出水,這是就進入了「paused」模式。

flowing模式

const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))

rs.setEncoding('utf8')

rs.on('data', (data) => {
    console.log(data)
})
複製代碼

paused模式

const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))

rs.setEncoding('utf8')

rs.on('readable', () => {
    let d = rs.read(1)
    console.log(d)
})
複製代碼

實現原理

流動模式原理

咱們來實現一個簡單的流動模式下的可讀流介紹其原理,由NODEJS官方文檔可知,流繼承自EventEmitter模塊,而後咱們定義一些默認參數、緩存區、模式:

let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null
        
        this.buffer = Buffer.alloc(this.highWaterMark);//定義緩存區大小
        
        this.pos = this.start; // pos 讀取的位置 可變 start不變的
        
        this.flowing = null; // null就是暫停模式
    }
}

module.exports = ReadStream;
複製代碼

接着在咱們須要定義一個打開文件的方法用於打開文件。還有一個一個destroy方法,用於在文件操做出錯或者讀完以後關閉文件。

open(){
    fs.open(this.path,this.flags,(err,fd)=>{
        if(err){
            this.emit('error',err);
            if(this.autoClose){ // 是否自動關閉
                this.destroy();
            }
            return;
        }
        this.fd = fd; // 保存文件描述符
        this.emit('open'); // 文件打開了
    });
}
 destroy(){
    // 先判斷有沒有fd 有關閉文件 觸發close事件
    if(typeof this.fd ==='number'){
        fs.close(this.fd,()=>{
            this.emit('close');
        });
        return;
    }
    this.emit('close'); // 銷燬
}
複製代碼

接着要在構造函數中調用open方法,當用戶綁定data監聽時,修改可讀流的模式:

constructor(path,options){
    super();
    this.path = path;
    this.flags = options.flags || 'r';
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark|| 64*1024;
    this.start = options.start||0;
    this.end = options.end;
    this.encoding = options.encoding || null
    this.flowing = null; 
    this.buffer = Buffer.alloc(this.highWaterMark);
    this.pos = this.start;
    
    this.open();//打開文件 fd
    this.on('newListener',(eventName,callback)=>{
        if(eventName === 'data'){
            // 至關於用戶監聽了data事件
            this.flowing  = true;
            // 監聽了 就去讀
            this.read(); // 去讀內容了
        }
    })
}
複製代碼

接下來咱們實現最總要的read方法,首先要保證文件已經打開,接着鍍組文件進入緩存,觸發data事件傳入數據,若是處於流動模式,繼續讀取直到讀完文件。

read(){
    // 此時文件還沒打開呢
    if(typeof this.fd !== 'number'){
        // 當文件真正打開的時候 會觸發open事件,觸發事件後再執行read,此時fd確定有了
        return this.once('open',()=>this.read())
    }
    // 此時有fd了
    // 應該填highWaterMark?
    // 想讀4個 寫的是3  每次讀3個
    // 123 4
    let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
    fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
        // 讀到了多少個 累加
        if(bytesRead>0){
            this.pos+= bytesRead;
            let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
            this.emit('data',data);
            // 當讀取的位置 大於了末尾 就是讀取完畢了
            if(this.pos > this.end){
                this.emit('end');
                this.destroy();
            }
            if(this.flowing) { // 流動模式繼續觸發
                this.read(); 
            }
        }else{
            this.emit('end');
            this.destroy();
        }
    });
}
複製代碼

剩下的pause和resume方法,很簡單

resume() {
    this.flowing = true;
    this.read();
}
pause() {
    this.flowing = false;
}
複製代碼

簡單的流實現完成了,看一下完整代碼

let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null

        this.open();

        this.flowing = null; // null就是暫停模式

        this.buffer = Buffer.alloc(this.highWaterMark);

        this.pos = this.start; 
        this.on('newListener', (eventName,callback) => {
            if (eventName === 'data') {
                this.flowing  = true;
                this.read(); 
            }
        })
    }
    
    read(){
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this.read())
        }
        let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {
            if (bytesRead > 0) {
                this.pos += bytesRead;
                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);
                this.emit('data', data);
                if(this.pos > this.end){
                    this.emit('end');
                    this.destroy();
                }
                if(this.flowing) { 
                    this.read(); 
                }
            }else{
                this.emit('end');
                this.destroy();
            }
        });
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
    
    pause() {
        this.flowing = false;
    }
    
    destroy() {
        if(typeof this.fd === 'number') {
            fs.close(this.fd, () => {
                this.emit('close');
            });
            return;
        }
        this.emit('close'); 
    };
    
    open() {
        fs.open(this.path, this.flags, (err,fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) { 
                    this.destroy();
                }
                return;
            }
            this.fd = fd; 
            this.emit('open'); 
        });
    }
}
module.exports = ReadStream;
複製代碼

暫停模式原理

以上是流動模式的可讀流實現原理,暫停模式的可讀流原理與流動模式的主要區別在於監聽readable事件的綁定與read方法,先實現監聽綁定readable事件回調函數時,調用read方法讀取數據到緩存區,定義一個讀取方法_read

constructor(path, options) {
    super();
    this.path = path;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true;
    this.start = 0;
    this.end = options.end;
    this.flags = options.flags || 'r';

    this.buffers = []; // 緩存區 
    this.pos = this.start;
    this.length = 0; // 緩存區大小
    this.emittedReadable = false;
    this.reading = false; // 不是正在讀取的
    this.open();
    this.on('newListener', (eventName) => {
        if (eventName === 'readable') {
            this.read();
        }
    })
}

read(n) {
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading) {
            this.reading = true;
            this._read(); 
        }
    }
}

_read() {
    if (typeof this.fd !== 'number') {
        return this.once('open', () => this._read());
    }
    let buffer = Buffer.alloc(this.highWaterMark);
    fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
        if (bytesRead > 0) {
            this.buffers.push(buffer.slice(0, bytesRead));
            this.pos += bytesRead;
            this.length += bytesRead;
            this.reading = false;
            if (this.emittedReadable) {
                this.emittedReadable = false; 
                this.emit('readable');
            }
        } else {
            this.emit('end');
            this.destroy();
        }
    })
}

複製代碼

由api可知,暫停模式下的可讀流手動調用read方法參數能夠大於highWaterMark,爲了處理這種狀況,咱們先寫一個函數computeNewHighWaterMark,取到大於等於n的最小2的n次方的整數

function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }
複製代碼

而後寫read方法,要考慮全n的各類狀況,上代碼

read(n) { 

    if(n>this.length){
        // 更改緩存區大小  讀取五個就找 2的幾回放最近的
        this.highWaterMark = computeNewHighWaterMark(n)
        this.emittedReadable = true;
        this._read();
    }


    // 若是n>0 去緩存區中取吧
    let buffer=null;
    let index = 0; // 維護buffer的索引的
    let flag = true;
    if (n > 0 && n <= this.length) { // 讀的內容 緩存區中有這麼多
        // 在緩存區中取 [[2,3],[4,5,6]]
        buffer = Buffer.alloc(n); // 這是要返回的buffer
        let buf;
        while (flag&&(buf = this.buffers.shift())) {
            for (let i = 0; i < buf.length; i++) {
                buffer[index++] = buf[i];
                if(index === n){ // 拷貝夠了 不須要拷貝了
                    flag = false;
                    this.length -= n;
                    let bufferArr = buf.slice(i+1); // 取出留下的部分
                    // 若是有剩下的內容 在放入到緩存中
                    if(bufferArr.length > 0){
                        this.buffers.unshift(bufferArr);
                    }
                    break;
                }
            }
        }
    }
    // 當前緩存區 小於highWaterMark時在去讀取
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading){
            this.reading = true;
            this._read(); // 異步的
        }
    }
    return buffer
}
複製代碼

附上可讀流暫停模式的完整實現原理代碼

let fs = require('fs');
let EventEmitter = require('events');
function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || 'r';

        this.buffers = []; // 緩存區 
        this.pos = this.start;
        this.length = 0; // 緩存區大小
        this.emittedReadable = false;
        this.reading = false; // 不是正在讀取的
        this.open();
        this.on('newListener', (eventName) => {
            if (eventName === 'readable') {
                this.read();
            }
        })
    }
    read(n) { 

        if(n>this.length){
            // 更改緩存區大小  讀取五個就找 2的幾回放最近的
            this.highWaterMark = computeNewHighWaterMark(n)
            this.emittedReadable = true;
            this._read();
        }


        // 若是n>0 去緩存區中取吧
        let buffer=null;
        let index = 0; // 維護buffer的索引的
        let flag = true;
        if (n > 0 && n <= this.length) { // 讀的內容 緩存區中有這麼多
            // 在緩存區中取 [[2,3],[4,5,6]]
            buffer = Buffer.alloc(n); // 這是要返回的buffer
            let buf;
            while (flag&&(buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    buffer[index++] = buf[i];
                    if(index === n){ // 拷貝夠了 不須要拷貝了
                        flag = false;
                        this.length -= n;
                        let bufferArr = buf.slice(i+1); // 取出留下的部分
                        // 若是有剩下的內容 在放入到緩存中
                        if(bufferArr.length > 0){
                            this.buffers.unshift(bufferArr);
                        }
                        break;
                    }
                }
            }
        }
        // 當前緩存區 小於highWaterMark時在去讀取
        if (this.length == 0) {
            this.emittedReadable = true;
        }
        if (this.length < this.highWaterMark) {
            if(!this.reading){
                this.reading = true;
                this._read(); // 異步的
            }
        }
        return buffer
    }
    // 封裝的讀取的方法
    _read() {
        // 當文件打開後在去讀取
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this._read());
        }
        // 上來我要喝水 先倒三升水 []
        let buffer = Buffer.alloc(this.highWaterMark);
        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
            if (bytesRead > 0) {
                // 默認讀取的內容放到緩存區中
                this.buffers.push(buffer.slice(0, bytesRead));
                this.pos += bytesRead; // 維護讀取的索引
                this.length += bytesRead;// 維護緩存區的大小
                this.reading = false;
                // 是否須要觸發readable事件
                if (this.emittedReadable) {
                    this.emittedReadable = false; // 下次默認不觸發
                    this.emit('readable');
                }
            } else {
                this.emit('end');
                this.destroy();
            }
        })
    }
    destroy() {
        if (typeof this.fd !== 'number') {
            return this.emit('close')
        }
        fs.close(this.fd, () => {
            this.emit('close')
        })
    }
    open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        });
    }
}

module.exports = ReadStream;
複製代碼
相關文章
相關標籤/搜索