流(stream)是一種在 Node.js 中處理流式數據的抽象接口。 stream 模塊提供了一些基礎的 API,用於構建實現了流接口的對象。緩存
Node.js 提供了多種流對象。 例如,發送到 HTTP 服務器的請求和 process.stdout 都是流的實例。bash
流能夠是可讀的、可寫的、或是可讀寫的。 全部的流都是 EventEmitter 的實例。服務器
Node.js 中有四種基本的流類型(本篇主要說前兩種):異步
流中一個至關重要的概念,不管讀寫流都是經過緩衝來實現的。 可寫流和可讀流都會在一個內部的緩衝器中存儲數據,能夠分別使用的 writable.writableBuffer 或 readable.readableBuffer 來獲取,可緩衝的數據的數量取決於傳入流構造函數的 highWaterMark 選項,默認狀況下highWaterMark 64*1024個字節 讀寫的過程都是將數據讀取寫入緩衝,而後在將數據讀出或者寫入文件。函數
來一個小例子,有助於理解學習
// pipe
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:1
})
let ws = fs.createWriteStream('./5.txt',{
highWaterMark:2
})
let index = 1;
rs.on('data', (data) => {
console.log(index++)
let flag = ws.write(data); // 當內部的可寫緩衝的總大小小於 highWaterMark 設置的閾值時,
//調用 writable.write() 會返回 true。 一旦內部緩衝的大小達到或超過 highWaterMark 時,則會返回 false。
if (!flag) { //內部緩衝超過highWaterMark
rs.pause()
}
})
let wsIndex = 1;
ws.on('drain', () => {
console.log('ws'+wsIndex++)
rs.resume()
})
// 1 2 ws1 3 4 ws2 5 6 ws3
複製代碼
前面已經說了全部的流都是 EventEmitter 的實例,那麼就能夠on,能夠emit等等ui
其實啊,在工做中也是不多直接這用到的,咱們能夠直接用pipe rs.pipe(ws)便可 這樣就給一個可讀流寫入到一個可寫流當中this
let EventEmitter = require('events'); //全部的流都是 EventEmitter 的實例,流繼承EventEmitter
let fs = require('fs');
class ReadStream extends EventEmitter {
constructor(path, options = {}) {
super();
this.path = path;
this.autoClose = options.autoClose || true;
this.flags = options.flags || 'r';
this.encoding = options.encoding || null;
this.start = options.start || 0;
this.end = options.end || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
// 應該有一個讀取文件的位置 可變的(可變的位置)
this.pos = this.start;
// 控制當前是不是流動模式
this.flowing = null;
// 構建讀取到的內容的buffer
this.buffer = Buffer.alloc(this.highWaterMark);
// 當建立可讀流 要將文件打開
this.open(); // 異步執行
this.on('newListener', (type) => {
if(type === 'data'){ // 用戶監聽了data事件,就開始讀取吧
this.flowing = true;
this.read();// 開始讀取文件
}
});
}
read(){
// 這時候文件尚未打開呢,等待着文件打開後再去讀取
if(typeof this.fd !== 'number'){
// 等待着文件打開,再次調用read方法
return this.once('open',()=>this.read());
}
// 開始讀取了
// 文件可能有10個字符串
// start 0 end 4
// 每次讀三個 3
// 0-2
// 34
let howMuchToRead = this.end ? Math.min(this.highWaterMark,this.end - this.pos+1) :this.highWaterMark
// 文件描述符 讀到哪一個buffer裏 讀取到buffer的哪一個位置
// 往buffer裏讀取幾個,讀取的位置
fs.read(this.fd, this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
if (bytesRead>0){ // 讀到內容了
this.pos += bytesRead;
// 保留有用的
let r = this.buffer.slice(0, bytesRead);
r = this.encoding ? r.toString(this.encoding) : r;
// 第一次讀取
this.emit('data', r);
if (this.flowing) {
this.read();
}
}else{
this.end = true;
this.emit('end');
this.destroy();
}
});
}
destroy() { // 判斷文件是否打開 (將文件關閉掉)
if (typeof this.fd === 'number') {
fs.close(this.fd, () => {
this.emit('close');
});
return;
}
this.emit('close');
}
open() { // 打開文件的邏輯
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err);
if (this.autoClose) {
this.destroy(); // 銷燬 關閉文件(觸發close事件)
} return;
}
this.fd = fd;
this.emit('open'); // 觸發文件開啓事件
});
}
pause(){
this.flowing = false;
}
resume(){
this.flowing = true;
this.read(); // 繼續讀取
}
}
module.exports = ReadStream;
複製代碼
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter{
constructor(path,options ={}){
super();
this.path = path;
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.highWaterMark = options.highWaterMark || 16*1024;
this.start = options.start || 0;
this.autoClose = options.autoClose|| true;
this.encoding = options.encoding || 'utf8';
// 是否須要觸發drain事件
this.needDrain = false;
// 是否正在寫入
this.writing = false;
// 緩存 正在寫入就放到緩存中
this.buffer = [];
// 算一個當前緩存的個數
this.len = 0;
// 寫入的時候也有位置關係
this.pos = this.start;
this.open();
}
// 0 [1 2]
write(chunk, encoding = this.encoding,callback){
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk);
this.len += chunk.length;// 每次調用write就統計一下長度
this.needDrain = this.highWaterMark <= this.len;
// this.fd
if(this.writing){
this.buffer.push({chunk,encoding,callback});
}else{
// 當文件寫入後 清空緩存區的內容
this.writing = true; // 走緩存
this._write(chunk,encoding,()=>this.clearBuffer());
}
return !this.needDrain; // write 的返回值必須是true / false
//這時候能夠回頭看一下上面的例子,在this.len >= this.higWaterMark的時候,返回了一個fasle,例子中就暫停讀取了。等待寫入完成
}
_write(chunk,encoding,callback){
if (typeof this.fd !== 'number') {
return this.once('open', () => this._write(chunk, encoding, callback));
}
// fd是文件描述符 chunk是數據 0 寫入的位置和 長度 , this.pos偏移量
fs.write(this.fd, chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
this.pos += bytesWritten;
this.len -= bytesWritten; // 寫入的長度會減小
callback();
});
}
clearBuffer(){
let buf = this.buffer.shift();
if(buf){
this._write(buf.chunk, buf.encoding, () => this.clearBuffer());
}else{
this.writing = false;
this.needDrain = false; // 觸發一次drain 再置回false 方便下次繼續判斷
this.emit('drain');
}
}
destroy(){
if(typeof this.fd === 'number'){
fs.close(this.fd,()=>{
this.emit('close');
});
return
}
this.emit('close');
}
open(){
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
this.emit('error');
this.destroy();
return
}
this.fd = fd;
this.emit('open');
});
}
}
module.exports = WriteStream;
複製代碼
以上就是流的一些基礎知識,流的簡單應用以及本身實現的可讀流可寫流。固然有不少不足之處,但願朋友們提出指正。也但願和各位朋友一塊兒學習分享!spa