上篇文章寫了可讀流的用法和源碼實現,能夠先去看下,其中有類似的地方,重複的地方就很少作介紹了,就直接寫用法。有一點值得一提就是可寫流有緩存區的概念html
let fs = require('fs');
let ws = fs.createWriteStream('2.txt', {
flags: 'w', // 文件的操做, 'w'寫入文件,不存在則建立
mode: 0o666,
autoClose: true,
highWaterMark: 3, // 默認寫是16*1024
encoding: 'utf8'
});
複製代碼
不太明白?不要緊,咱們看下面這個例子幫助咱們理解node
let fs = require("fs");
let ws = fs.createWriteStream('1.txt',{
flags:'w',
encoding:'utf8',
start:0,
highWaterMark:3
});
let i =9;
function write() {
let flag = true;
while (flag && i>=0){
flag = ws.write(i-- +'');//往1.txt寫入9876543210
console.log(flag);
}
}
ws.on('drain',()=>{ //緩存區充滿並被寫入完成,處於清空狀態時觸發
console.log("幹了");
write(); //當緩存區清空後咱們在繼續寫
})
write(); //第一次調用write方法
複製代碼
看到這裏,咱們應該基本明白用法,下面咱們開始寫源碼實現,鑑於與可讀流有不少類似寫法上篇文章已詳細寫過,重複的就很少說了,說幹就幹git
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter{
constructor(path,options={}){
super();
this.path = path;
this.flags = options.flags || 'w';
this.encoding = options.encoding || 'utf8';
this.start = options.start || 0;
this.pos = this.start;
this.mode = options.mode || 0o666;
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 16 * 1024;
//第一次寫入是真的往文件裏寫
this.writing = false; //默認第一次不是正在寫入
// 可寫流 要有一個緩存區,當正在寫入文件時,內容要寫入到緩存區
this.cache = [];
// 記錄緩存區大小
this.len =0;
// 是否觸發drain事件
this.needDrain = false;
this.open(); //目的是拿到fd 異步,觸發一個open事件後fd確定存在啦
}
}
複製代碼
open(){
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
this.emit('error', err); //打開文件發生錯誤,發佈error事件
this.emit('error');
if (this.autoClose) { //若是須要自動關閉我再去銷燬fd
this.destroy(); //關閉文件(觸發關閉事件)
}
return;
}
this.fd = fd; //保存文件描述符
this.emit('open', this.fd) //觸發文件open方法
})
}
destroy() {
if (typeof this.fd != 'number') { //文件未打開,也要關閉文件且觸發close事件
return this.emit('close');
}
fs.close(this.fd, () => { //若是文件打開過了 那就關閉文件而且觸發close事件
this.emit("close");
})
}
複製代碼
write(chunk,encoding = this.encoding,callback){ //客戶調用的是write
//chunk必須是buffer或者字符串, 爲了統一,若是傳遞的是字符串也要轉成buffer
chunk = Buffer.isBuffer(chunk)?chunk : Buffer.from(chunk,encoding);
this.len +=chunk.length; //維護寫入的緩存的長度
let ret = this.len <this.highWaterMark; //一個標識 比較是否達成了緩存區的大小
this.needDrain = !ret; //是否須要觸發needDrain
if(this.writing){ //默認爲false上面定義的,判斷是否正在寫入 若是是正在寫入 就寫入到緩存區中
this.cache.push({chunk,encoding,callback})
}else { //第一次寫
this.writing = true;
this._write(chunk,encoding,()=>this.clearBuffer()); //專門實現寫的方法
}
return ret; //能不能繼續寫了 false表示下次寫的時候就要佔用內存
}
複製代碼
_write(chunk,encoding,clearBuffer){
if(typeof this.fd!= 'number'){
//由於write方法是同步調用,此時fd尚未獲取到,因此等待獲取到再執行write
return this.once('open',()=>this._write(chunk,encoding,clearBuffer));
}
//確保有fd
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
this.pos +=byteWritten; //偏移量,默認爲0
this.len -=byteWritten; //每次寫入後就要在內存中減小下
this.writing = false; //正在寫入就不要再寫拉,放緩存區
clearBuffer(); //清空緩存區
})
}
複製代碼
let buffer = this.cache.shift(); //取緩存區第一個內容
if(buffer){ //緩存裏有
this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer())
}else { //緩存裏沒有了
if(this.needDrain){ //須要觸發drain事件
this.writing = false; //告訴下次直接寫就能夠了 不須要寫到內存中
this.needDrain = false;
this.emit('drain');
}
}
複製代碼
寫到這裏,基本就寫完了,源碼有3000多行,這個只是簡單實現,看不太懂的時候就多寫幾遍(ps 我也寫了好多遍)。測試下看行不行吧github