node中stream模塊是很是,很是,很是重要的一個模塊,由於不少模塊都是這個模塊封裝的:node
因爲node原碼的可讀流有將近一千行的代碼,其中有大量的異常處理,debug調試,各類可讀流的兼容處理,加碼解碼處理等,因此這裏採起一個簡化版的實現,源碼中使用鏈表做爲buffer,這裏採用數組進行簡化,主要是闡述可讀流的處理過程。web
const EE = require('events');
const util = require('util');
const fs = require('fs');
function Readable(path,options) {//這個參數是源碼沒有的,這裏主要是爲了讀取fs爲案例加的
EE.call(this);//構造函數繼承EventEmiter
this.path = path;
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 64 * 1024;//64k
this.encoding = options.encoding || null;
this.flags = options.flags || 'r';//// 這個源碼沒有的,這裏主要是爲了fs讀取案例加的
this.needEmitReadable = false;// 須要觸發readable事件,默認不須要
this.position = 0;// 偏移量
this.cache = []; // 緩存區
this.reading = false;// 是否正在從緩存中讀取,消費者消耗中
this.length = 0; // 緩存區大小,控制長度
this.open(); // 這個源碼沒有的,這裏主要是爲了fs讀取案例加的
this.on('newListener', (type) => {
if (type === 'readable') { // 看一下是不是'readable'事件模式
this.read();//消耗者,從buffer讀取數據
}
})
}
util.inherits(Readable, EE);//原型繼承EventEmiter
複製代碼
下面這個函數在Readable沒有,可是在ReadStream中存在,這裏爲了利用fs讀取操做說明流,簡化實現版本上添加了這個方法,後面說明ReadStream模塊和Readable的繼承關係數組
Readable.prototype.open = function(){//這裏是異步打開的操做
fs.open(this.path, this.flags, (err, fd) => {
if (err) { // 銷燬文件
if (this.autoClose) { // 若是須要自動關閉觸發一下銷燬事件
this.destroy(); // 它銷燬文件
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open', fd);
});
}
//源碼中的destory不是這樣的,這裏只是ReadStream中的destory,源碼中作了各類可讀流的兼容組合處理
Readable.prototype.destroy = function() {
if (typeof this.fd != 'number') {
this.emit('close');
} else {
fs.close(this.fd, () => {
this.emit('close');
})
}
}
複製代碼
Readable.prototype.read = function(n) {
let buffer = null;
if(n>this.len){// 若是緩存區中有數據不夠此次讀取,則調整highWaterMark而且補充緩存區
this.highWaterMark = computeNewHighWaterMark(n);//從新計算調整內存2的次方 exp: 5 => 8
this.needEmitReadable = true;
this.reading = true;
this._read();
}
if (n > 0 && n < this.len) { // 若是緩存區中有數據夠此次讀取,則從緩存區中讀取
buffer = Buffer.alloc(n);
let current;
let index = 0;
let flag = true;
//這裏的代碼就是源碼中fromList()的功能,對buffer進行調整,exp:123 456 789讀取12 => 3 456 789
while (flag && (current = this.cache.shift())) {//current是一個buffer
for (let i = 0; i < current.length; i++) {
buffer[index++] = current[i];//將緩存區中的chunk內容copy到buffer中
if (index == n) {//n個數據讀取完畢
flag = false;
this.length -= n; //緩存區長度更新
let c = current.slice(i + 1);//獲取完的chunk exp:123 => 3
if (c.length) {
this.cache.unshift(c);//將沒有取完的chunk放回 exp: 3
}
break;
}
}
}
}
if(this.length === 0){//緩存中沒有數據
this.needEmitReadable = true; 須要觸發'readable'
}
if (this.length < this.highWaterMark) {//緩存區沒有滿,補充緩存區
this.reading = true;
this._read();
}
return buffer;//read()返回值爲一個buffer
}
//第一次讀取是內置的自動讀取到緩存區
//而後觸發readable是從緩存區中讀取消耗的同時,而且也會補充緩存區
Readable.prototype._read = function(n) {
if (typeof this.fd !== 'number') {
return this.once('open', () => this._read());//由於fs.open是異步函數,當執行read必需要在open以後
}
let buffer = Buffer.alloc(this.highWaterMark);
//源碼中經過Readable.prototype.push()調用readableAddChunk()再調用addChunk()
//這裏經過fs.read來調用addChunk(this,bytesRead)
fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
addChunk(this,bytesRead);
})
}
//源碼中經過Readable.prototype.push()調用readableAddChunk()再調用addChunk()
function addChunk(stream, chunk) {
stream.length += bytesRead; // 增長緩存的個數
stream.position += bytesRead;//記錄文件讀取的位置
stream.reading = false;
stream.cache.push(buffer);//數據放入到緩存中
if (stream.needEmitReadable) {
stream.needEmitReadable = false;
stream.emit('readable');
}
}
//源碼中這個函數是經過howMatchToRead()調用的,由於howMatchToRead()在其餘的流中也會用到,因此兼容了其餘狀況
function computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}
複製代碼
const fs = require('fs');
const rs = fs.createReadStream('./1.txt',{//123456789
flags:'r',
autoClose:true,
highWaterMark:3,
encoding:null
});
rs.on('readable',function () {
// 若是緩存區沒這麼大會返回null
let r = rs.read(1);
console.log(r);
console.log(rs._readableState.length);
rs.read(1);
setTimeout(() => {//由於補充是異步的
console.log(rs._readableState.length);
}, 1000);
});
複製代碼
ReadStream實際上是Readable的子類,它繼承了Readable,以fs.createReadStream爲例(node/lib/internal/fs/streams.js): 緩存
const EE = require('events');
const util = require('util');
const fs = require('fs');
function ReadStream (path,options) {
this.path = path;
this.flags = options.flags || 'r'; //用來標識打開文件的模式
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.start = options.start || 0; //讀取(文件)的開始位置
this.end = options.end || null; //讀取(文件)的結束位置
this.autoClose = options.autoClose || true;
this.flowing = null; // 默認非流動模式
this.position = this.start // 記錄讀取數據的位置
this.open(); // 打開文夾
this.on('newListener', function (type) {
if (type === 'data') { // 用戶監聽了data事件
this.flowing = true;
this.read();
}
})
}
ReadStream.prototype.read = function (){
if (typeof this.fd !== 'number') {// open操做是異步的,因此必須等待文件打開this.fd存在說明打開文件
return this.once('open', () => this.read());
}
let buffer = Buffer.alloc(this.highWaterMark); // 把數據讀取到這個buffer中
//判斷每次讀取的數據是多少exp:數據源1234567890 highWaterMark=3
//最後一次讀取長度爲1
let howMuchToRead = Math.min(this.end - this.pos + 1, this.highWaterMark);
fs.read(this.fd, buffer, 0, howMuchToRead, this.position, (err, byteRead) => {
if (byteRead > 0) {
this.emit('data', buffer.slice(0, byteRead));
this.position += byteRead;//更新讀取的起點
if (this.flowing) {//處在flowing模式中就一直讀
this.read();
}
}else{//讀取完畢
this.flowing = null;
this.emit('end');
if(this.autoClose){
this.destroy();
}
}
}
//經過flowing控制暫停仍是繼續讀取
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
ReadStream.prototype.destroy = function () {
if (typeof this.fd != 'number') {
this.emit('close');
} else {
fs.close(this.fd, () => {
this.emit('close');
})
}
};
ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, (err, fd) => {// fd文件描述符 只要文件打開了就是number
if (err) {
if (this.autoClose) { // 若是須要自動關閉 觸發一下銷燬事件
this.destroy(); // 銷燬文件
}
return this.emit('error', err);
}
this.fd = fd;
this.emit('open', fd);
});
};
複製代碼
let fs = require('fs');
let ReadStream = require('./ReadStream')
let rs = fs.createReadStream('1.txt',{//1234567890
encoding:null,
flags:'r+',
highWaterMark:3,
autoClose:true,
start:0,
end:3
});
let arr = [];
rs.on('open',function () {
console.log(' 文件開啓了')
});
rs.on('data',function (data) {
console.log(data);
arr.push(data);
});
rs.on('end',function () { // 只有目標文件讀取完畢後才觸發
console.log('結束了');
console.log(Buffer.concat(arr).toString());
});
rs.pause()
setTimeout(function () {
rs.resume(); // 恢復的是data事件的觸發
},1000)
rs.on('error',function (err) {
console.log('出錯了')
})
rs.on('close',function () {
console.log('close')
});
複製代碼
但願這篇文章可以讓各位看官對Stream熟悉,由於這個模塊是node中的核心,不少模塊都是繼承這個模塊實現的,若是熟悉了這個模塊,對node的使用以及koa等框架的使用將大有好處,接下來會逐步介紹writable等流模式本文參考:bash