詳解node.js中的可讀流(Readable)和可寫流(Writeable)

Node.js的流就是爲了在有限的內存中實現咱們操做"海量"數據的目標。node

流是一組有序的,有起點和終點的字節數據傳輸手段,它是一個抽象的接口,被 Node 中的不少對象所實現。node裏不少內容都應用到流,好比HTTP 服務器request和response對象都是流。緩存

它不關心文件的總體內容,只關注是否從文件中讀到了數據,以及讀到數據以後的處理。bash

Node.js中Stream 有四種流類型。服務器

可讀流(Readable)

  • 可讀流分爲:流動模式(flowing mode)暫停模式(paused mode)異步

  • 可讀流在建立時都是暫停模式。暫停模式和流動模式能夠互相轉換。函數

1) 流動模式(flowing mode)

流動模式下,數據會源源不斷地生產出來,造成「流動」現象。監聽流的data事件即可進入該模式。ui

2) 暫停模式(paused mode)

暫停模式下,須要顯示地調用read(),觸發data事件。this

在初始狀態下,監聽data事件,會使流進入流動模式。但若是在暫停模式下,監聽data事件並不會使它進入流動模式。爲了消耗流,須要顯示調用read()方法。編碼

3)相互轉化

  • 若是不存在管道目標,調用readable.resume()可以使流進入流動模式spa

  • 若是存在管道目標,調用 stream.unpipe()並取消'data'事件監聽

建立可讀流

var rs = fs.createReadStream(path,[options]);複製代碼
  1. path讀取文件的路徑

  2. options

    • flags 打開文件要作的操做,默認爲'r'讀取

    • encoding 默認爲null,表明buffer。若是指定utf8編碼highWaterMark要大於3個字節

    • start 開始讀取的索引位置

    • end 結束讀取的索引位置(包括結束位置)

    • highWaterMark 讀取緩存區默認的大小64kb

    • autoClose 讀取完畢後是否自動關閉

相關方法

flowing流動模式
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'),{ //這裏的參數通常不會寫
  flags:'r',//文件的操做是讀取操做
  encoding:'utf8', // 默認是null null表明的是buffer
  autoClose:true, // 讀取完畢後自動關閉
  highWaterMark:3,// 默認是64k  64*1024b
  start:0, //讀取的起始位置 
  end:3 // 讀取的結束位置,包前又包後,至關於閉區間
})
//默認狀況下 不會將文件中的內容輸出
//內部會先建立一個buffer先讀取3b
//至關於有蓋子的水管,不會流出來,存儲在管中
​
//有兩種模式 非流動模式/暫停模式
//由於建立時第二個參數通常不會寫,讀出來的類型是buffer,這個方法能夠指定編碼
rs.setEncoding('utf8');
​
//打開文件
rs.on('open',function(data){ 
  console.log(data)
})
//關閉文件
rs.on('close',function(data){ 
  console.log(data)
})
//有錯誤就會報錯誤
rs.on('err',function(data){ 
  console.log(data)
})
​
//暫停模式->流動模式
//流動模式只要監聽了會瘋狂的觸發data事件,直到讀取完畢
rs.on('data',function(data){
  console.log(data);
  //一打開水龍頭就嘩嘩出水,有個方法可讓它暫停
  rs.pause(); //暫停方法,表示暫停讀取,暫停data事件觸發
})
setInterval(function(){
  rs.resume(); //恢復data事件的觸發,變爲流動模式繼續讀取
},3000)
rs.on('end',function(data){ //先end再close關閉
  console.log(data)
})
    複製代碼
paused暫停模式
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'));
​
rs.setEncoding('utf8');
​
// 當我只要建立一個流,就會先把緩存區填滿,等待着你本身消費
// 若是當前緩存區被清空後會再次觸發readable事件
// 當你消費小於最高水位線時,會自動添加highWater這麼多數據
rs.on('readable', () => {
    let d = rs.read(1)
    console.log(d)
})複製代碼

實現可讀流功能原理

流動模式
let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
    constructor(path,options){
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null
​
        this.open();//打開文件 fd
​
        this.flowing = null; // null就是暫停模式
        // 看是否監聽了data事件,若是監聽了 就要變成流動模式
​
        // 要創建一個buffer 這個buffer就是要一次讀多少
        this.buffer = Buffer.alloc(this.highWaterMark);
​
        this.pos = this.start; // pos 讀取的位置 可變 start不變的
        this.on('newListener',(eventName,callback)=>{
            if(eventName === 'data'){
                // 至關於用戶監聽了data事件
                this.flowing  = true;
                // 監聽了 就去讀
                this.read(); // 去讀內容了
            }
        })
    }
    read(){
        // 此時文件還沒打開呢
        if(typeof this.fd !== 'number'){
            // 當文件真正打開的時候 會觸發open事件,觸發事件後再執行read,此時fd確定有了
            return this.once('open',()=>this.read())
        }
        // 此時有fd了
        let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
            // 讀到了多少個 累加
            if(bytesRead>0){
                this.pos+= bytesRead;
                let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
                this.emit('data',data);
                // 當讀取的位置 大於了末尾 就是讀取完畢了
                if(this.pos > this.end){
                    this.emit('end');
                    this.destroy();
                }
                if(this.flowing) { // 流動模式繼續觸發
                    this.read(); 
                }
            }else{
                this.emit('end');
                this.destroy();
            }
        });
    }
    resume(){
        this.flowing = true;
        this.read();
    }
    pause(){
        this.flowing = false;
    }
    destroy(){
        // 先判斷有沒有fd 有關閉文件 觸發close事件
        if(typeof this.fd ==='number'){
            fs.close(this.fd,()=>{
                this.emit('close');
            });
            return;
        }
        this.emit('close'); // 銷燬
    };
    open(){
        // copy 先打開文件
        fs.open(this.path,this.flags,(err,fd)=>{
            if(err){
                this.emit('error',err);
                if(this.autoClose){ // 是否自動關閉
                    this.destroy();
                }
                return;
            }
            this.fd = fd; // 保存文件描述符
            this.emit('open'); // 文件打開了
        });
    }
}
module.exports = ReadStream;複製代碼
pipe

.pipe()函數是接受一個源頭src並將數據輸出到一個可寫的流dst

簡單來講,邊讀邊寫東西,讀太快,來不及寫,就先暫停讀,等寫完了再繼續讀。

let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');
let rs = new ReadStream(path.join(__dirname,'./1.txt'),{
    highWaterMark:4
});
let ws = new WriteStream(path.join(__dirname,'./2.txt'),{
    highWaterMark:1
});
// 讀四個,寫一個
rs.pipe(ws); // pipe就是讀一點寫一點複製代碼

pipe原理實現,寫在ReadStream的方法中

pipe(ws){
    this.on('data',(chunk)=>{
        let flag = ws.write(chunk);
        if(!flag){
            this.pause();
        }
    });
    ws.on('drain',()=>{
        this.resume();
    })
}複製代碼
暫停模式
let fs = require('fs');
let EventEmitter = require('events');
//當讀取內容大於緩存區,從新計算讀取數量n的大小的方法
function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || 'r';
​
        this.buffers = []; // 緩存區 
        this.pos = this.start;
        this.length = 0; // 緩存區大小
        this.emittedReadable = false;
        this.reading = false; // 不是正在讀取的
        this.open();
        this.on('newListener', (eventName) => {
            if (eventName === 'readable') {
                this.read();
            }
        })
    }
    read(n) { // 想取1個
​
        if(n>this.length){
            // 更改緩存區大小  讀取五個就找 2的幾回放最近的
            this.highWaterMark = computeNewHighWaterMark(n)
            this.emittedReadable = true;
            this._read();
        }
​
​
        // 若是n>0 去緩存區中取吧
        let buffer=null;
        let index = 0; // 維護buffer的索引的
        let flag = true;
        if (n > 0 && n <= this.length) { // 讀的內容 緩存區中有這麼多
            // 在緩存區中取 [[2,3],[4,5,6]]
            buffer = Buffer.alloc(n); // 這是要返回的buffer
            let buf;
            while (flag&&(buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    buffer[index++] = buf[i];
                    if(index === n){ // 拷貝夠了 不須要拷貝了
                        flag = false;
                        this.length -= n;
                        let bufferArr = buf.slice(i+1); // 取出留下的部分
                        // 若是有剩下的內容 在放入到緩存中
                        if(bufferArr.length > 0){
                            this.buffers.unshift(bufferArr);
                        }
                        break;
                    }
                }
            }
        }
        // 當前緩存區 小於highWaterMark時在去讀取
        if (this.length == 0) {
            this.emittedReadable = true;
        }
        if (this.length < this.highWaterMark) {
            if(!this.reading){
                this.reading = true;
                this._read(); // 異步的
            }
        }
        return buffer
    }
    // 封裝的讀取的方法
    _read() {
        // 當文件打開後在去讀取
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this._read());
        }
        // 上來我要喝水 先倒三升水 []
        let buffer = Buffer.alloc(this.highWaterMark);
        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
            if (bytesRead > 0) {
                // 默認讀取的內容放到緩存區中
                this.buffers.push(buffer.slice(0, bytesRead));
                this.pos += bytesRead; // 維護讀取的索引
                this.length += bytesRead;// 維護緩存區的大小
                this.reading = false;
                // 是否須要觸發readable事件
                if (this.emittedReadable) {
                    this.emittedReadable = false; // 下次默認不觸發
                    this.emit('readable');
                }
            } else {
                this.emit('end');
                this.destroy();
            }
        })
    }
    destroy() {
        if (typeof this.fd !== 'number') {
            return this.emit('close')
        }
        fs.close(this.fd, () => {
            this.emit('close')
        })
    }
    open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        });
    }
}
​
module.exports = ReadStream;複製代碼

可寫流(Writeable)

建立可寫流

var ws = fs.createWriteStream(path,[options]);

  1. path寫入的文件路徑

  2. options

    • flags打開文件要作的操做,默認爲'w'

    • encoding默認爲utf8

    • highWaterMark寫入緩存區的默認大小16kb

相關方法

let fs=require('fs');
let path=require('path');
//寫的時候文件不存在,會建立文件
let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    mode:0o666,
    autoClose:true,
    highWaterMark:3, // 默認寫是16k
    encoding:'utf8',
    start:0
});
​
//第一個參數寫入的數據必須是字符串或者Buffer
//第二個參數寫入以什麼編碼寫進去
//第三個參數callback
//有返回值,表明是否能繼續寫,寫的時候,有個緩存區的概念。可是返回false,也不會丟失,就是會把內容放到內存中
let flag=ws.write(1+'','utf8',()=>{})//這是異步的方法
​
//傳入的參數,寫完後也會寫入文件內
ws.end('ok'); //當寫完後,就不能再繼續寫了
​
//抽乾方法,當寫入完後會觸發drain方法
//緩存區必須滿了,滿了清空後纔會觸發drain
//若是調用end後,再調用這個方法沒有意義了
ws.on('drain',function(){
  console.log('drain')
})
​複製代碼

實現可寫流功能原理

let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{
    constructor(path,options){
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark||16*1024;
        this.autoClose = options.autoClose||true;
        this.mode = options.mode;
        this.start = options.start||0;
        this.flags = options.flags||'w';
        this.encoding = options.encoding || 'utf8';
​
        // 可寫流 要有一個緩存區,當正在寫入文件是,內容要寫入到緩存區中
        // 在源碼中是一個鏈表 => []
​
        this.buffers = [];
​
        // 標識 是否正在寫入
        this.writing = false;
​
        // 是否知足觸發drain事件
        this.needDrain = false;
​
        // 記錄寫入的位置
        this.pos = 0;
​
        // 記錄緩存區的大小
        this.length = 0;
        this.open();
    }
    destroy(){
        if(typeof this.fd !=='number'){
            return this.emit('close');
        }
        fs.close(this.fd,()=>{
            this.emit('close')
        })
    }
    open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit('error',err);
                if(this.autoClose){
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        })
    }
    write(chunk,encoding=this.encoding,callback=()=>{}){
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
        // write 返回一個boolean類型 
        this.length+=chunk.length; 
        let ret = this.length<this.highWaterMark; // 比較是否達到了緩存區的大小
        this.needDrain = !ret; // 是否須要觸發needDrain
        // 判斷是否正在寫入 若是是正在寫入 就寫入到緩存區中
        if(this.writing){
            this.buffers.push({
                encoding,
                chunk,
                callback
            }); // []
        }else{
            // 專門用來將內容 寫入到文件內
            this.writing = true;
            this._write(chunk,encoding,()=>{
                callback();
                this.clearBuffer();
            }); // 8
        }
        return ret;
    }
    clearBuffer(){
        let buffer = this.buffers.shift();
        if(buffer){
            this._write(buffer.chunk,buffer.encoding,()=>{
                buffer.callback();
                this.clearBuffer()
            });
        }else{
            this.writing = false;
            if(this.needDrain){ // 是否須要觸發drain 須要就發射drain事件
                this.needDrain = false;
                this.emit('drain');
            }
        }
    }
    _write(chunk,encoding,callback){
        if(typeof this.fd !== 'number'){
            return this.once('open',()=>this._write(chunk,encoding,callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
            this.length -= byteWritten;
            this.pos += byteWritten;
            
            callback(); // 清空緩存區的內容
        });
    }
}
​
module.exports = WriteStream;複製代碼


啊~~文章彷佛太長太囉嗦了,看來怎麼把給本身看的筆記整理成一個好的文章也是一門學問!

相關文章
相關標籤/搜索