以前在開發ASP.NET的時候,根據源代碼依次追蹤整個程序的執行過程,發現最重要的過程是基於一個管道流的,像水管同樣,依次處理管道流中的十幾個事件,當時對流的認知是四個字,依次執行。那麼如今作Node的開發,對於Node中的流是另四個字,那就是源源不斷。本篇文章主要目的是帶你們手寫可讀流與可寫流。html
在Node中,請求流,響應流,文件流等都是基於stream模塊封裝的。簡單的理解,流就是將大塊的東西,分小塊依次處理。就像你須要10kg的水,水管就一點點的源源不斷的流出來給你。又如在程序中node
fs.readFileSync('/demo.txt', {encoding:'utf8'});fs.writeFileSync('/demo.txt', data);
複製代碼
以上兩個方法是把文件內容所有讀入內存,而後再寫入文件,可是若是文件過大就會出現問題了,內存容易爆掉。這裏就須要用到流了,一點點的讀取或者寫入。api
可讀流分爲兩種模式:flowing 和 paused,而且兩種模式能夠相互轉換數組
1.在 flowing 模式下, 可讀流自動從系統底層讀取數據,並經過 EventEmitter 接口的事件儘快將數據提供給應用。緩存
2.在 paused 模式下,必須顯式調用 stream.read() 方法來從流中讀取數據片斷。bash
3.全部初始工做模式爲 paused 的 Readable 流,能夠經過下面三種途徑切換到 flowing 模式:異步
4.可讀流能夠經過下面途徑切換到 paused 模式:函數
5.爲了便於理解,這裏分開寫兩種模式,下面爲flowing模式基本實現ui
流程圖:this
class ReadStream extends EventEmitter{
constructor(path,options){
super(path,options);
this.path = path;//寫入路徑
this.flags = options.flags || 'r'; //操做修飾符
this.mode = options.mode || 0o666; //權限
this.autoClose = options.autoClose;//是否自動關閉
this.highWaterMark = options.highWaterMark || 64 * 1024; //默認64k
this.pos = this.start = options.start || 0;//起始位置
this.end = options.end;//結束位置
this.encoding = options.encoding;//編碼
this.flowing = null;//流動模式
this.buffer = Buffer.alloc(this.highWaterMark);//讀取的buffer 不是緩存
this.open();
this.on('newListener',(type,listener)=>{
if (type == 'data') {
this.flowing = true;
this.read();
}
})
}
複製代碼
open() {
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
if(this.autoClose){
this.destroy();
return this.emit('error',err);
}
}
this.fd = fd;
this.emit('open');
})
}
複製代碼
read(){
if(typeof this.fd != 'number'){
return this.once('open',()=>this.read());
}
let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是實際讀到的字節數
if(err){
if(this.autoClose)
this.destroy();
return this.emit('error',err);
}
if(bytes){
let data = this.buffer.slice(0,bytes);
this.pos += bytes;
data = this.encoding?data.toString(this.encoding):data;
this.emit('data',data);
if(this.end && this.pos > this.end){
return this.endFn();
}else{
if(this.flowing)
this.read();
}
}else{
return this.endFn();
}
})
}
複製代碼
pipe(ws){
this.on('data',data =>{
let flag = ws.write(data);
if (!flag) {
this.pause();
}
})
ws.on('drain',()=>{
this.resume();
})
}
pause(){
this.flowing = false;
}
resume(){
this.flowing = true;
this.read();
}
複製代碼
this.on('newListener', (type) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
複製代碼
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
複製代碼
if (0 < n < this.length) {
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
複製代碼
流程圖
//構造函數
constructor(path,options){
super(path,options);
this.path = path; //寫入路徑
this.flags = options.flags || 'w';//操做修飾符
this.mode = options.mode || 0o666;//權限
this.start = options.start || 0;//寫入的起始位置
this.pos = this.start;//文件的寫入索引
this.encoding = options.encoding || 'utf8';//編碼
this.autoClose = options.autoClose;//自動關閉
this.highWaterMark = options.highWaterMark || 16 * 1024; //默認最高水位線16k
this.buffers = [];//緩存區 源碼裏面是鏈表
this.writing = false;//標識內部正在寫入數據
this.length = 0;//標識緩存區字節的長度
this.open();//默認一建立就打開文件
}
複製代碼
write(chunk,encoding,callback){
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk,this.encoding);//此方法只吸入buffer或者字符串
this.length += chunk.length;//當前緩存區的長度
if (this.writing) {//若是正在寫入數據 則須要把數據放入緩存區裏面
this.buffers.push({
chunk,
encoding,
callback
})
} else { //若是當前空閒 直接調用底層寫入的方法進行寫入 而且在寫完之後 清空緩存區
this.writing = true;
this._write(chunk,encoding,()=>{
callback&&callback();
this.clearBuffer();
})
}
//write方法有一個返回值 表示當前緩存區是否超過了最高水位線 便是否能繼續寫入
return this.length < this.highWaterMark;
}
_write(chunk,encoding,callback){
if (typeof this.fd != 'number') { //由於是異步的 文件可能在這個時候並未打開
return this.once('open',()=>this._write(chunk, encoding, callback));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
if (err) {
if (this.autoClose) {
this.destory();
}
return this.emit('error',err);
}
this.pos += bytesWritten;
this.length -= bytesWritten;
callback&&callback();
})
}
clearBuffer(){
let data = this.buffers.shift();
if(data){
this._write(data.chunk,data.encoding,()=>{
data.callback && data.callback();
this.clearBuffer();
})
}else{
this.writing = false;
//緩存區清空了 緩存區若是沒有滿過 是不會觸發這個事件的
this.emit('drain');
}
}
複製代碼