Almost all Node.js applications, no matter how simple, use streams in some manner. javascript
開篇先嚇嚇本身。畫畫圖,分析分析代碼加深本身的理解。html
簡單瞭解node streamjava
1. 在編寫代碼時,咱們應該有一些方法將程序像鏈接水管同樣鏈接起來 -- 當咱們須要獲取一些數據時,能夠去經過"擰"其餘的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964node
結合到node中git
stream 就像是一個抽象的模型(有點像水管),能有序的傳輸數據(有點像水),須要時就擰開水管取點用,還能夠控制大小。github
可讀流是對提供數據的來源的一種抽象。就像水管傳遞水資源供咱們消費使用同樣。api
可讀流有兩種模式:流動模式(flowing)或暫停模式(paused)緩存
Stream 實例的 _readableState.flow(readableState 是內部用來存儲狀態數據的對象) 有三個狀態:數據結構
舉個例子: flowing 模式,一旦綁定監聽器到 'data' 事件時,流會轉換到流動模式_readableState.flow = true併發
const { Readable } = require('stream');
class myReadable extends Readable {
constructor(options,sources) {
super(options)
this.sources = sources
this.pos = 0
}
// 繼承了Readable 的類必須實現 _read() 私有方法,被內部 Readable類的方法調用
// 當_read() 被調用時,若是從資源讀取到數據,則須要開始使用 this.push(dataChunk) 推送數據到讀取隊列。
// _read() 應該持續從資源讀取數據並推送數據,直到push(null)
_read() {
if(this.pos < this.sources.length) {
this.push(this.sources[this.pos])
this.pos ++
} else {
this.push(null)
}
}
}
let rs = new myReadable({},"我是羅小布,我是某個地方來的水資源")
let waterCup = ''
// 綁定監聽器到 'data' 事件時,流會轉換到流動模式。
// 當流將數據塊傳送給消費者後觸發。
rs.on('data',(chunk)=>{
console.log(chunk); // chunk 是一個 buffer
waterCup += chunk
})
rs.on('end',()=>{
console.log('讀取消耗完畢');
console.log(waterCup)
})複製代碼
從上述代碼開啓調試:
大概的畫了一下flowing模式的代碼執行圖:(這個圖真心很差看,建議看後面的那個。這個不是流程圖)
_read() 函數裏面push 是同步操做會先將數據存儲在this.buffer (this.buffe = new bufferList(),bufferList是內部實現的數據結構)變量中,而後再從this.buffer 變量中取出,emit('data',chunk) 消費掉。
_read() 函數裏面push 是異步,一旦異步操做中調用了push方法,且有數據,無緩存隊列,此時會直接emit('data',chunk) 消費掉。
可是若是在讀取數據的途中調用了stream.pause() 此時會中止消費數據,但不會中止生產數據,生產的數據會緩存起來,若是流的消費者沒有調用stream.read()方法, 這些數據會始終存在於內部緩存隊列中(this.buffe = new bufferList(),bufferList是內部實現的數據結構),直到被消費。
由上簡化圖形:
flowing 模式是自動獲取底層資源不斷流向消費者,是流動的。
已經封裝好的模塊更關注數據消費部分
http 模塊
let http = require('http')
let server = http.createServer((req,res)=>{
var method = req.method;
if(method === 'POST') {
req.on('data',()=>{ // 接收數據
console.log(chunk)
})
req.on('end',()=>{
// 接收數據完成
console.log(chunk)
res.end('ok')
})
}
})
server.listen(8000)複製代碼
fs 模塊
let fs = require('fs')
let path = require('path')
let rs = fs.createReadStream(path.resolve(__dirname,'1.txt'),{
flags: 'r+',
highWaterMark: 3,
})
rs.on('data',(data)=>{ // 接收數據
console.log(data.toString())
})
rs.on('end',()=>{ // 接收數據完成
console.log('end')
})
rs.on('error',(error)=>{
console.log(error)
})複製代碼
舉個例子: paused模式,一旦綁定監聽器到 'readable' 事件時,流會轉換到暫停模式_readableState.flow = false
const { Readable } = require("stream");
class myReadable extends Readable {
constructor(options, sources) {
super(options);
this.sources = Buffer.from(sources);
console.log(this.sources)
this.pos = 0;
}
// 繼承了Readable 的類必須實現 _read() 私有方法,被內部 Readable類的方法調用
// 當_read() 被調用時,若是從資源讀取到數據,則須要開始使用 this.push(dataChunk) 推送數據到讀取隊列。
// _read() 應該持續從資源讀取數據並推送數據,push(null)
_read(size) {
if (this.pos < this.sources.length) {
if(this.pos + size >= this.sources.length ) {
size = this.sources.length - this.pos
}
console.log('讀取了:', this.sources.slice(this.pos, this.pos + size))
this.push(this.sources.slice(this.pos, this.pos + size));
this.pos = this.pos + size;
} else {
this.push(null);
}
}
}
let rs = new myReadable(
{
highWaterMark: 8
},
'我是羅小布,我是某個地方來的水資源'
);
let waterCup;
// 綁定監聽器到 'readable' 事件時,流會轉換到暫停模式。
// 'readable' 事件將在流中有數據有變化的時候觸發
rs.on("readable", () => {
console.log('觸發了readable')
while (null !== (chunk = rs.read(7))) {
console.log("消耗---",chunk.length);
if(!waterCup) {
waterCup = chunk
} else {
waterCup = Buffer.concat([waterCup, chunk]);
}
}
});
rs.on("end", () => {
console.log("讀取消耗完畢");
console.log(waterCup.toString());
});複製代碼
從上述代碼開啓調試:
大概的畫了一下paused模式的代碼執行流程:
一旦開始監聽readable事件,Readable內部就會調用read方法,獲取數據到緩存中,併發出「readable」事件。
消費者監聽了 readable 事件並不會消費數據,須要主動調用 .read(size) 函數獲取數據,數據纔會從緩存池取出。
若是獲取的數據大於緩存池數據, .read(size) 會返回null, 底層會自動讀取數據存儲進緩存池併發出「readable」事件,通知消費。
當消費者得到數據後,若是資源池緩存低於highWaterMark值,底層會讀取並往緩存池輸送數據,直到緩存高於highWaterMark值(數據足夠的狀況)
// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);複製代碼
let { Writable } = require("stream");
class myWrite extends Writable {
constructor(dest, options) {
super(options);
}
// Writable 的類必須實現._write() 或._writev()私有方法,被內部 Writable類的方法調用
// _write 被調用時,將數據發送到底層資源。
// 不管是成功完成寫入仍是寫入失敗出現錯誤,都必須調用 callback
_write(chunk, encoding, callback) {
arr.push(chunk);
setTimeout(() => {
callback();
});
}
}
let arr = [];
let ws = new myWrite(arr, {
highWaterMark: 4
});
let text = "數據源哈哈哈";
let n = 0;
function write() {
let flag = true;
while (flag && text.length > n) {
console.log(text[n]);
flag = ws.write(text[n]);
n++;
}
}
ws.on("drain", () => {
console.log("排空了");
write();
});
write();複製代碼
從上述代碼開啓調試:
大概的畫了一下writable代碼執行圖:
調用 writable.write(chunk) ,若是此時正在進行底層寫,此時的數據流就會進入隊列池緩存起來,若是此時沒有則會調用_write()將數據寫入目的地。
可寫流經過反覆調用 writable.write(chunk) 方法將數據放到緩衝器。 當內部緩衝數據的總數小於 highWaterMark 指定的閾值時, 調用 writable.write() 將返回true。 一旦內部緩衝器的大小達到或超過 highWaterMark ,調用 writable.write() 將返回 false 。
此時最好中止調用writable.write(chunk),等待內部將緩存區清空 emit('drain') 時,再接着寫入數據。
由上簡化圖形:
能夠關注一下finish 方法
已經封裝好的模塊更關注數據生產部分
fs模塊:
let fs = require("fs");
let path = require("path");
let ws = fs.createWriteStream(path.resolve(__dirname, "./1.txt"), {
flags: "w",
encoding: "utf8",
start: 0,
highWaterMark: 3
});
let i = 9;
function write() {
let flag = true; // 表示是否能寫入
while (flag && i >= 0) {
// 9 - 0
flag = ws.write(i-- + "");
}
}
ws.on("drain", () => {
write();
});
write();複製代碼
文章是對stream的簡單瞭解,文中例子比較粗糙,理解不許確之處,還請教正。
node文檔寫的很詳細,瞭解更多細節能夠參考文檔,以及node源碼。
參考資料: