淺析node中流應用(二) 可寫流(fs.createWriteStream)

上篇文章寫了可讀流的用法和源碼實現,能夠先去看下,其中有類似的地方,重複的地方就很少作介紹了,就直接寫用法。有一點值得一提就是可寫流有緩存區的概念html

可寫流的用法

建立可寫流

let fs = require('fs');
let ws = fs.createWriteStream('2.txt', {
    flags: 'w',         // 文件的操做, 'w'寫入文件,不存在則建立
    mode: 0o666,
    autoClose: true,
    highWaterMark: 3,   // 默認寫是16*1024
    encoding: 'utf8'
});

複製代碼

ws.write()方法和drain事件

  • 每次寫入後會有一個標識 flag,當寫入的內容的長度超過highWaterMark就會返回false
  • true表示能夠繼續寫入,若是返回false,表示緩存區滿了,咱們應當中止讀取數據以免消耗過多內存。
  • 緩存區滿後,文件寫入一直在進行,不一下子會把緩存區的內容所有寫入,緩存區處於清空狀態,這時會觸發可寫流的‘drain’事件

不太明白?不要緊,咱們看下面這個例子幫助咱們理解node

let fs = require("fs");
let ws = fs.createWriteStream('1.txt',{
    flags:'w',
    encoding:'utf8',
    start:0,
    highWaterMark:3
});
let i =9;
function write() {
    let flag = true;
    while (flag && i>=0){
      flag =   ws.write(i-- +'');//往1.txt寫入9876543210
        console.log(flag);
    }
}
ws.on('drain',()=>{ //緩存區充滿並被寫入完成,處於清空狀態時觸發
    console.log("幹了");
    write(); //當緩存區清空後咱們在繼續寫
})
write(); //第一次調用write方法


複製代碼

看到這裏,咱們應該基本明白用法,下面咱們開始寫源碼實現,鑑於與可讀流有不少類似寫法上篇文章已詳細寫過,重複的就很少說了,說幹就幹git

可寫流實現原理

一、聲明WriteStream的構造函數(準備工做)

let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter{
    constructor(path,options={}){
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.encoding = options.encoding || 'utf8';
        this.start = options.start || 0;
        this.pos = this.start;
        this.mode = options.mode || 0o666;
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark || 16 * 1024;

        //第一次寫入是真的往文件裏寫
        this.writing = false; //默認第一次不是正在寫入

        // 可寫流  要有一個緩存區,當正在寫入文件時,內容要寫入到緩存區
        this.cache = [];

        // 記錄緩存區大小
        this.len =0;

        // 是否觸發drain事件
        this.needDrain = false;


        this.open(); //目的是拿到fd 異步,觸發一個open事件後fd確定存在啦
    }
  }
複製代碼
  • 把變量聲明好,下面用到的時候就知道怎麼回事了
  • 值得主要的是,咱們第一次往文件裏寫入的時候是真的往文件寫,下次要寫的內容先放到緩存區,寫入完成後從緩存區去取內容

二、實現open方法和destroy方法

  • 這兩個方法與可讀流同樣,不作介紹,直接上代碼
open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit('error', err); //打開文件發生錯誤,發佈error事件
                this.emit('error');
                if (this.autoClose) { //若是須要自動關閉我再去銷燬fd
                    this.destroy(); //關閉文件(觸發關閉事件)
                }
                return;
            }
            this.fd = fd; //保存文件描述符
            this.emit('open', this.fd) //觸發文件open方法
        })
    }
    destroy() {
        if (typeof this.fd != 'number') { //文件未打開,也要關閉文件且觸發close事件
            return this.emit('close');
        }
        fs.close(this.fd, () => {  //若是文件打開過了 那就關閉文件而且觸發close事件
            this.emit("close");
        })
    }
複製代碼

三、實現write方法,與客戶端調用ws.write()方法對應

write(chunk,encoding = this.encoding,callback){ //客戶調用的是write
        //chunk必須是buffer或者字符串, 爲了統一,若是傳遞的是字符串也要轉成buffer
        chunk = Buffer.isBuffer(chunk)?chunk : Buffer.from(chunk,encoding);
        this.len +=chunk.length; //維護寫入的緩存的長度
        let ret = this.len <this.highWaterMark; //一個標識 比較是否達成了緩存區的大小
        this.needDrain = !ret; //是否須要觸發needDrain
        if(this.writing){ //默認爲false上面定義的,判斷是否正在寫入  若是是正在寫入 就寫入到緩存區中
            this.cache.push({chunk,encoding,callback})
        }else { //第一次寫
            this.writing = true;
            this._write(chunk,encoding,()=>this.clearBuffer()); //專門實現寫的方法

        }

        return ret; //能不能繼續寫了 false表示下次寫的時候就要佔用內存

    }
複製代碼
  • _write()方法 重點, 咱們專門用來實現真正的寫入方法

4 、實現重頭戲 _write()方法

_write(chunk,encoding,clearBuffer){
        if(typeof this.fd!= 'number'){
            //由於write方法是同步調用,此時fd尚未獲取到,因此等待獲取到再執行write
            return this.once('open',()=>this._write(chunk,encoding,clearBuffer));
        }
        //確保有fd
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
            this.pos +=byteWritten; //偏移量,默認爲0
            this.len -=byteWritten; //每次寫入後就要在內存中減小下
            this.writing = false; //正在寫入就不要再寫拉,放緩存區
            clearBuffer(); //清空緩存區
            
        })
    }
複製代碼

五、清空緩存區

let buffer = this.cache.shift(); //取緩存區第一個內容
        if(buffer){ //緩存裏有
            this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer())
        }else { //緩存裏沒有了
            if(this.needDrain){ //須要觸發drain事件
                this.writing = false; //告訴下次直接寫就能夠了  不須要寫到內存中
                this.needDrain = false;
                this.emit('drain');
            }
        }
複製代碼
  • 分析下
  • 若是是正在寫入,就先把內容放到緩存區裏,就是this.cache,默認[]
  • 給數組裏存入一個對象,分別對應chunk, encoding, callback即(()=>this.clearBuffer())
  • 每一次寫完後都須要把cache(緩存區)裏的內容清空掉
  • 當緩存區cache數組裏是空的時候就會觸發drain事件了

寫到這裏,基本就寫完了,源碼有3000多行,這個只是簡單實現,看不太懂的時候就多寫幾遍(ps 我也寫了好多遍)。測試下看行不行吧github

相關文章
相關標籤/搜索