const EE = require('events');
const util = require('util');
const fs = require('fs');
function Writable(path,options) {//這個參數是源碼沒有的,這裏主要是爲了讀取fs爲案例加的
EE.call(this);//構造函數繼承EventEmiter
this.path = path;
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 64 * 1024;//64k
this.encoding = options.encoding || null;
this.flags = options.flags || 'w';//// 這個源碼沒有的,這裏主要是爲了fs讀取案例加的
this.needEmitDrain = false;// 須要觸發drain事件,默認不須要
this.position = 0;// 偏移量
this.cache = []; // 緩存區
this.writing = false;// 是否正在從緩存中讀取,生產者增長
this.length = 0; // 緩存區大小,控制長度
this.open(); // 這個源碼沒有的,這裏主要是爲了fs讀取案例加的
}
util.inherits(Writable, EE);//原型繼承EventEmiter
複製代碼
Writable.prototype.write = function (chunk, encoding=this.encoding, callback=()=>{}) {
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
//第一次雖然數據沒有放入到緩存,可是因爲後面會調用_write會將這個長度減去,因此先加上,保證length的正確性
this.length += chunk.length;
if (this.length >= this.highWaterMark ) {//消耗緩存的長度大於緩存的最大容量觸發drain
this.needDrain = true;
}
if (this.writing) {//若是正在執行寫操做,則後面要寫入目標的數據先存入緩存
this.cache.push({
chunk, encoding, callback
})
} else {// 沒有執行寫操做則執行寫操做
this.writing = true;
//源碼中在這裏調用dowrite()而後調用_write()和__writev()
this._write(chunk, encoding, () => {callback();this.clearBuffer()});
}
return this.length < this.highWaterMark //若是緩存區的內容大於了highWaterMark 那就返回false
}
// 源碼中在write()中調用dowrite()而後調用_write()和__writev()來進行讀操做
Writable.prototype._write = function (chunk, encoding, callback) {
if (typeof this.fd !== 'number') {//這裏是異步打開的操做,要保證有fd,沒有則綁定once等文件open再觸發
return this.once('open', () => this._write(chunk, encoding, callback));
}
// 源碼中clearBuffer()調用dowrite()來消耗緩存
// 源碼中dowrite()再調用onwriteStateUpdate()對length進行更新
// 因此源碼中這裏不須要調用clearBuffer
{
this.position += bytesWritten // 位置增長便宜
this.length -= bytesWritten;// 緩存長度更新
callback();//裏面包含了clearBuffer()
}
}
//源碼中clearBuffer()實是在end的時候調用的,
//源碼中clearBuffer()調用dowrite()而後調用_write()和__writev()來消耗內存
//源碼中dowrite()再調用onwriteStateUpdate()對緩存length進行更新
//這裏只是爲了簡化
function clearBuffer(){
let obj = this.cache.shift();
if(obj){
this._write(obj.chunk,obj.encoding,()=>{obj.callback();this.clearBuffer()});
}else{
if(this.needDrain){
this.writing = false;
this.needDrain = false;
this.emit('drain'); // 觸發drain事件
}
}
}
複製代碼
WriteStream實際上是writabl的子類,它繼承了writabl,以fs.createWriteStream爲例(node/lib/internal/fs/streams.js) node
而後對上面的_write方法進行了覆蓋: 以及對_writev方法進行了覆蓋: 而且在其上擴展了open和close:只須要對上面的Writable進行showier的修改web
const EE = require('events');
const util = require('util');
const fs = require('fs');
function Writable(path,options) {
EE.call(this);
this.path = path;
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.encoding = options.encoding || null;
this.flags = options.flags || 'w';
this.needEmitDrain = false;
this.position = 0;
this.cache = [];
this.writing = false;
this.length = 0;
this.open();
}
util.inherits(Writable, EE);
Writable.prototype.write = function (chunk, encoding=this.encoding, callback=()=>{}) {
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
this.length += chunk.length;
if (this.length >= this.highWaterMark ) {
this.needDrain = true;
}
if (this.writing) {
this.cache.push({
chunk, encoding, callback
})
} else {
this.writing = true;
this._write(chunk, encoding, () => {callback();this.clearBuffer()});
}
return this.length < this.highWaterMark
}
Writable.prototype._write = function (chunk, encoding, callback) {
if (typeof this.fd !== 'number') {//這裏是異步打開的操做,要保證有fd,沒有則綁定once等文件open再觸發
return this.once('open', () => this._write(chunk, encoding, callback));
}
//將_write和fs.write結合
//源碼中是覆蓋_write和_writev
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
this.pos += bytesWritten
this.len -= bytesWritten;
callback();
});
}
Writable.prototype.destroy = function () {
if (typeof this.fd != 'number') {
this.emit('close');
} else {
fs.close(this.fd, () => {
this.emit('close');
})
}
}
Writable.prototype.open = function () {
fs.open(this.path, this.flags, (err, fd) => { // fd文件描述符 只要文件打開了就是number
if (err) { // 銷燬文件
if (this.autoClose) { // 若是須要自動關閉 觸發一下銷燬事件
this.destroy();
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open', fd);
});
};
function clearBuffer(){
let obj = this.cache.shift();
if(obj){
this._write(obj.chunk,obj.encoding,()=>{obj.callback();this.clearBuffer()});
}else{
if(this.needDrain){
this.writing = false;
this.needDrain = false;
this.emit('drain'); // 觸發drain事件
}
}
}
複製代碼
const fs = require('fs');
const ReadStream = require('./ReadStream');
const WriteStream = require('./WriteStream');
let rs = new ReadStream('./1.txt',{
highWaterMark:4
});
let ws = new WriteStream('./3.txt',{
highWaterMark:1
});
rs.pipe(ws);
複製代碼
因爲pipe方法是在ReadStream上調用的,因此咱們能夠修改上篇的ReadStream來實現,源碼中Readable和Writable都有pipe的實現緩存
const EE = require('events');
const util = require('util');
const fs = require('fs');
function ReadStream (path,options) {
this.path = path;
this.flags = options.flags || 'r'; //用來標識打開文件的模式
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.start = options.start || 0; //讀取(文件)的開始位置
this.end = options.end || null; //讀取(文件)的結束位置
this.autoClose = options.autoClose || true;
this.flowing = null; // 默認非流動模式
this.position = this.start // 記錄讀取數據的位置
this.open(); // 打開文夾
this.on('newListener', function (type) {
if (type === 'data') { // 用戶監聽了data事件
this.flowing = true;
this.read();
}
})
}
ReadStream.prototype.read = function (){
if (typeof this.fd !== 'number') {// open操做是異步的,因此必須等待文件打開this.fd存在說明打開文件
return this.once('open', () => this.read());
}
let buffer = Buffer.alloc(this.highWaterMark); // 把數據讀取到這個buffer中
//判斷每次讀取的數據是多少exp:數據源1234567890 highWaterMark=3
//最後一次讀取長度爲1
let howMuchToRead = Math.min(this.end - this.pos + 1, this.highWaterMark);
fs.read(this.fd, buffer, 0, howMuchToRead, this.position, (err, byteRead) => {
if (byteRead > 0) {
this.emit('data', buffer.slice(0, byteRead));
this.position += byteRead;//更新讀取的起點
if (this.flowing) {//處在flowing模式中就一直讀
this.read();
}
}else{//讀取完畢
this.flowing = null;
this.emit('end');
if(this.autoClose){
this.destroy();
}
}
}
//經過flowing控制暫停仍是繼續讀取
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
ReadStream.prototype.pipe = function (ws){
this.on('data', (data)=> {
let flag = ws.write(data);//讀完以後寫,根據flag判斷不須要讀操做來增長緩存的長度
if (!flag) {
this.pause();
}
});
ws.on('drain',()=> {//當寫完緩存以後,lenght=0,發射drain來恢復讀取往緩存中添加內容
this.resume();
})
}
ReadStream.prototype.destroy = function () {
if (typeof this.fd != 'number') {
this.emit('close');
} else {
fs.close(this.fd, () => {
this.emit('close');
})
}
};
ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, (err, fd) => {// fd文件描述符 只要文件打開了就是number
if (err) {
if (this.autoClose) { // 若是須要自動關閉 觸發一下銷燬事件
this.destroy(); // 銷燬文件
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open', fd);
});
};
複製代碼
但願這篇文章可以讓各位看官對Stream熟悉,由於這個模塊是node中的核心,不少模塊都是繼承這個模塊實現的,若是熟悉了這個模塊,對node的使用以及koa等框架的使用將大有好處,接下來會逐步介紹其餘流模式本文參考:bash