簡單瞭解node stream

Almost all Node.js applications, no matter how simple, use streams in some manner. javascript

開篇先嚇嚇本身。畫畫圖,分析分析代碼加深本身的理解。html

簡單瞭解node streamjava

  • stream基本概念
  • Readable - 可讀取數據的流
  • Writable - 可寫入數據的流
  • 總結

1. stream基本概念

1.1. 什麼是 stream

1. 在編寫代碼時,咱們應該有一些方法將程序像鏈接水管同樣鏈接起來 -- 當咱們須要獲取一些數據時,能夠去經過"擰"其餘的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964node

結合到node中git

stream 就像是一個抽象的模型(有點像水管),能有序的傳輸數據(有點像水),須要時就擰開水管取點用,還能夠控制大小。github

1.2 Node.js 中有四種基本的流類型:

  • Writable - 可寫入數據的流(例如 fs.createWriteStream())。
  • Readable - 可讀取數據的流(例如 fs.createReadStream())。
  • Duplex - 可讀又可寫的流(例如 net.Socket)。
  • Transform - 在讀寫過程當中能夠修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())。

2. Readable-可讀取數據的流

2.1. 簡單描述Readable 可讀取數據流

可讀流是對提供數據的來源的一種抽象。就像水管傳遞水資源供咱們消費使用同樣。api

可讀流有兩種模式:流動模式(flowing)或暫停模式(paused)緩存

  • 流動模式flowing,數據自動從底層系統讀取,並經過EventEmitter 接口的‘’data'事件儘量快地被提供給應用程序。
  • 暫停模式paused, 數據必須顯示經過調用stream.read()讀取數據。

Stream 實例的 _readableState.flow(readableState 是內部用來存儲狀態數據的對象) 有三個狀態:數據結構

  • _readableState.flow = null,暫時沒有消費者過來(初始狀態)
  • _readableState.flow = false,
  • _readableState.flow = true,

2.2. Readable 可讀取數據流 flowing 模式

舉個例子: flowing 模式,一旦綁定監聽器到 'data' 事件時,流會轉換到流動模式_readableState.flow = true併發

const { Readable } = require('stream');
class myReadable extends Readable {
  constructor(options,sources) {
    super(options)
    this.sources = sources
    this.pos = 0
  }
  // 繼承了Readable 的類必須實現 _read() 私有方法,被內部 Readable類的方法調用
  // 當_read() 被調用時,若是從資源讀取到數據,則須要開始使用 this.push(dataChunk) 推送數據到讀取隊列。 
  // _read() 應該持續從資源讀取數據並推送數據,直到push(null)
  _read() {
    if(this.pos < this.sources.length) {
      this.push(this.sources[this.pos])
      this.pos ++ 
    } else {
      this.push(null)
    }
  }
}
let rs = new myReadable({},"我是羅小布,我是某個地方來的水資源")
let waterCup = ''
// 綁定監聽器到 'data' 事件時,流會轉換到流動模式。
// 當流將數據塊傳送給消費者後觸發。 
rs.on('data',(chunk)=>{
  console.log(chunk); // chunk 是一個 buffer
  waterCup += chunk
})
rs.on('end',()=>{
  console.log('讀取消耗完畢');
  console.log(waterCup)
})複製代碼

從上述代碼開啓調試:

大概的畫了一下flowing模式的代碼執行圖:(這個圖真心很差看,建議看後面的那個。這個不是流程圖)

一旦開始監聽data方法,Readable內部就會調用read方法,來觸發讀流操做,

_read() 函數裏面push 是同步操做會先將數據存儲在this.buffer (this.buffe = new bufferList(),bufferList是內部實現的數據結構)變量中,而後再從this.buffer 變量中取出,emit('data',chunk) 消費掉。

_read() 函數裏面push 是異步,一旦異步操做中調用了push方法,且有數據,無緩存隊列,此時會直接emit('data',chunk) 消費掉。

可是若是在讀取數據的途中調用了stream.pause() 此時會中止消費數據,但不會中止生產數據,生產的數據會緩存起來,若是流的消費者沒有調用stream.read()方法, 這些數據會始終存在於內部緩存隊列中(this.buffe = new bufferList(),bufferList是內部實現的數據結構),直到被消費。

由上簡化圖形:

flowing 模式是自動獲取底層資源不斷流向消費者,是流動的。

數據自動從底層系統讀取,並經過EventEmitter 接口的‘’data'事件儘量快地被提供給應用。

2.3. flowing 模式在 node 其它模塊中的使用

已經封裝好的模塊更關注數據消費部分

http 模塊

let http = require('http')

let server = http.createServer((req,res)=>{
  var method = req.method;
  if(method === 'POST') {
    req.on('data',()=>{ // 接收數據
      console.log(chunk)
    })
    req.on('end',()=>{
      // 接收數據完成
      console.log(chunk)
      res.end('ok')
    })
  }
})
server.listen(8000)複製代碼

fs 模塊

let fs = require('fs')
let path = require('path')
let rs = fs.createReadStream(path.resolve(__dirname,'1.txt'),{
  flags: 'r+',
  highWaterMark: 3,
})
rs.on('data',(data)=>{ // 接收數據
  console.log(data.toString())
})
rs.on('end',()=>{ // 接收數據完成
  console.log('end')
})
rs.on('error',(error)=>{
  console.log(error)
})複製代碼

2.4. Readable 可讀取數據流 paused模式

舉個例子: paused模式,一旦綁定監聽器到 'readable' 事件時,流會轉換到暫停模式_readableState.flow = false

const { Readable } = require("stream");
class myReadable extends Readable {
  constructor(options, sources) {
    super(options);
    this.sources = Buffer.from(sources);
    console.log(this.sources)
    this.pos = 0;
  }
  // 繼承了Readable 的類必須實現 _read() 私有方法,被內部 Readable類的方法調用
  // 當_read() 被調用時,若是從資源讀取到數據,則須要開始使用 this.push(dataChunk) 推送數據到讀取隊列。
  // _read() 應該持續從資源讀取數據並推送數據,push(null)
  _read(size) {
    if (this.pos < this.sources.length) {
      if(this.pos + size >= this.sources.length ) {
        size = this.sources.length - this.pos
      }
      console.log('讀取了:', this.sources.slice(this.pos, this.pos + size))
      this.push(this.sources.slice(this.pos, this.pos + size));
      this.pos = this.pos + size;
    } else {
      this.push(null);
    }
  }
}
let rs = new myReadable(
  {
    highWaterMark: 8
  },
  '我是羅小布,我是某個地方來的水資源'
);
let waterCup;
// 綁定監聽器到 'readable' 事件時,流會轉換到暫停模式。
// 'readable' 事件將在流中有數據有變化的時候觸發
rs.on("readable", () => {
  console.log('觸發了readable')
  while (null !== (chunk = rs.read(7))) {
    console.log("消耗---",chunk.length);
    if(!waterCup) {
      waterCup = chunk
    } else {
      waterCup = Buffer.concat([waterCup, chunk]);
    }
  }
});

rs.on("end", () => {
  console.log("讀取消耗完畢");
  console.log(waterCup.toString());
});複製代碼

從上述代碼開啓調試:

大概的畫了一下paused模式的代碼執行流程:

一旦開始監聽readable事件,Readable內部就會調用read方法,獲取數據到緩存中,併發出「readable」事件。

消費者監聽了 readable 事件並不會消費數據,須要主動調用 .read(size) 函數獲取數據,數據纔會從緩存池取出。

若是獲取的數據大於緩存池數據, .read(size) 會返回null, 底層會自動讀取數據存儲進緩存池併發出「readable」事件,通知消費。

當消費者得到數據後,若是資源池緩存低於highWaterMark值,底層會讀取並往緩存池輸送數據,直到緩存高於highWaterMark值(數據足夠的狀況)

'readable' 事件觸發代表流有了新的動態:要麼是有了新的數據(獲取數據填充緩存),要麼是到了流的尾部。
對於前者, stream.read() 將返回可用的數據。而對於後者, stream.read() 將返回 null。
由上簡化圖形:
消費者監聽了 readable 事件並不會消費數據,須要主動調用 .read([size]) 函數獲取數據,數據纔會從緩存池取出。
不一樣於flowing 模式,數據是自動流出。
highWaterMark 的數據能夠根據讀取的數據修改:看以下源碼
// If we're asking for more than the current hwm, then raise the hwm.
  if (n > state.highWaterMark) 
    state.highWaterMark = computeNewHighWaterMark(n);複製代碼

3. Writable-可寫入數據的流

可寫流是對數據要被寫入的目的地的一種抽象。

3.1. Writable的小例子

let { Writable } = require("stream");
class myWrite extends Writable {
  constructor(dest, options) {
    super(options);
  }
  // Writable 的類必須實現._write() 或._writev()私有方法,被內部 Writable類的方法調用
  // _write 被調用時,將數據發送到底層資源。
  // 不管是成功完成寫入仍是寫入失敗出現錯誤,都必須調用 callback
  _write(chunk, encoding, callback) {
    arr.push(chunk);
    setTimeout(() => {
      callback();
    });
  }
}
let arr = [];
let ws = new myWrite(arr, {
  highWaterMark: 4
});
let text = "數據源哈哈哈";
let n = 0;
function write() {
  let flag = true;
  while (flag && text.length > n) {
    console.log(text[n]);
    flag = ws.write(text[n]);
    n++;
  }
}
ws.on("drain", () => {
  console.log("排空了");
  write();
});
write();複製代碼

從上述代碼開啓調試:

大概的畫了一下writable代碼執行圖:

調用 writable.write(chunk) ,若是此時正在進行底層寫,此時的數據流就會進入隊列池緩存起來,若是此時沒有則會調用_write()將數據寫入目的地。

可寫流經過反覆調用 writable.write(chunk) 方法將數據放到緩衝器。 當內部緩衝數據的總數小於 highWaterMark 指定的閾值時, 調用 writable.write() 將返回true。 一旦內部緩衝器的大小達到或超過 highWaterMark ,調用 writable.write() 將返回 false 。

此時最好中止調用writable.write(chunk),等待內部將緩存區清空 emit('drain') 時,再接着寫入數據。

由上簡化圖形:

能夠關注一下finish 方法

調用 writable.end() 代表已沒有數據要被寫入可寫流,且緩衝數據都已傳給底層系統以後觸發

3.2. stream writable 在node 其它模塊中的使用

已經封裝好的模塊更關注數據生產部分

fs模塊:

let fs = require("fs");
let path = require("path");
let ws = fs.createWriteStream(path.resolve(__dirname, "./1.txt"), {
  flags: "w",
  encoding: "utf8",
  start: 0,
  highWaterMark: 3
});
let i = 9;
function write() {
  let flag = true; // 表示是否能寫入
  while (flag && i >= 0) {
    // 9 - 0
    flag = ws.write(i-- + "");
  }
}
ws.on("drain", () => {
  write();
});
write();複製代碼

4. 總結

文章是對stream的簡單瞭解,文中例子比較粗糙,理解不許確之處,還請教正。

node文檔寫的很詳細,瞭解更多細節能夠參考文檔,以及node源碼。



參考資料:

github.com/substack/st…

www.barretlee.com/blog/2017/0…

nodejs.org/dist/latest…

github.com/nodejs/node…

github.com/nodejs/node…

相關文章
相關標籤/搜索