詳解NodeJs流之一

若是你對NodeJs系列感興趣,歡迎關注微信公衆號:前端神盾局或 github NodeJs系列文章html

流從早先的unix初出茅廬,在過去的幾十年的時間裏,它被證實是一種可依賴的編程方式,它能夠將一個大型的系統拆成一些很小的部分,而且讓這些部分之間完美地進行合做。前端

在node中,流的身影幾乎無處不在,不管是操做文件、建立本地服務器仍是簡單的console,都極有可能涉及到流。node

Node.js 中有四種基本的流類型:git

  • Readable - 可讀取數據的流(例如 fs.createReadStream())。
  • Writable - 可寫入數據的流(例如 fs.createWriteStream())。
  • Duplex - 可讀又可寫的流(例如 net.Socket)。
  • Transform - 在讀寫過程當中能夠修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())

爲何使用流

假設咱們須要使用node來實現一個簡單的靜態文件服務器:github

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.readFile('./test.html',function(err,data){
        if(err){
            res.statusCode = 500;
            res.end();
        }else{
            res.end(data);
        }
    })
}).listen(3000)

上述代碼簡單實現了靜態文件的讀取和發送,邏輯上是徹底可行的。可是因爲readFile是一次性將讀取的文件存放在內存中的,假設test.html文件很是大或者訪問量增多的狀況下,服務器內存頗有可能耗盡。這時咱們就須要使用流的方式進行改進:編程

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.createReadStream('./test.html').pipe(res);
}).listen(3000);

fs.createReadStream建立一個可讀流,逐次讀取文件內容供給下游消費,這種逐步讀取和消費的方式,有效減緩了內存的消耗。api

可讀流(Readable Stream)

image

咱們能夠把 Readable Stream拆分紅兩個階段:push階段和pull階段,在push階段,經過實現_read方法將數據從底層數據資源池中推送到緩存池中,這是數據的生產階段,而pull階段,則是將緩存池的數據拉出,供下游使用,這是數據的消費階段。緩存

在開始進一步講解以前,咱們先來介紹幾個字段,這些字段來源於node源碼:服務器

  • state.buffer: Array 緩存池,每一個元素對應push(data)中的data
  • state.length: Number 緩存池中的數據量,在objectMode模式下,state.length === state.buffer.length,不然,其值是state.buffer中數據字節數的總和
  • state.ended: Boolean 表示底層數據池沒有可讀數據了(this.pull(null))
  • state.flowing: Null|Boolean 表示當前流的模式,其值有三種狀況:null(初始狀態)、true(流動模式)、false(暫停模式)
  • state.needReadable: Boolean 是否須要觸發readable事件
  • state.reading: Boolean 是否正在讀取底層數據
  • state.sync: Boolean 是否當即觸發data/readable事件,false爲當即觸發、true下一個tick再觸發(process.nextTick)

兩種模式

可讀流存在兩種模式:流動模式(flowing)和暫停模式(paused),在源碼中使用state.flowing來標識。微信

兩種模式其基本流程都遵循上圖中的push和pull階段,區別在於pull階段的自主性。對於流動模式而言,只要緩存池還有未消耗的數據,那麼數據便會不斷的被提取,咱們能夠把它想象成一個自動的水泵,只要通電了,不抽乾水池的水它是不會停下來的。而對於暫停模式,它更像是打水桶,須要的時候再從水池裏面打點水出來。

全部可讀流都開始於暫停模式,能夠經過如下方式切換到流動模式:

  • 添加data事件句柄(前提是state.flowing === null
  • 調用stream.resume()
  • 調用stream.pipe()

可讀流也能夠經過如下方式切換回暫停模式:

  • 添加readable事件句柄
  • 若是沒有管道目標,則調用 stream.pause()
  • 若是有管道目標,則移除全部管道目標。調用 stream.unpipe() 能夠移除多個管道目標。

一切從read開始

對於可讀流而言,消費驅動生產,只有經過調用pull階段的read函數,才能喚醒push階段的數據產生,從而帶動整個流的運動。因此對於可讀流而言read是一切的起點。

這是根據源碼整理的一個簡單的流程圖,後面將對一些環節加以說明。

image

howMuchToRead

調用read(n)過程當中,node會根據實際狀況調整讀取的數量,實際值由howMuchRead決定

function howMuchToRead(n,state){
  // 若是size <= 0或者不存在可讀數據 
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
    
  // objectMode模式下 每次制度一個單位長度的數據
  if (state.objectMode)
    return 1;
    
  // 若是size沒有指定
  if (Number.isNaN(n)) {
    // 執行read()時,因爲流動模式下數據會不斷輸出,
    // 因此每次只輸出緩存中第一個元素輸出,而非流動模式則會將緩存讀空
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }
  
  if (n > state.highWaterMark)
    // 更新highWaterMark
    state.highWaterMark = computeNewHighWaterMark(n);

  // 若是緩存中的數據量夠用
  if (n <= state.length)
    return n;
    
  // 若是緩存中的數據不夠用,
  // 且資源池還有可讀取的數據,那麼這一次先不讀取緩存數據
  // 留着下一次數據量足夠的時候再讀取
  // 不然讀空緩存池
  if (!state.ended) {
    state.needReadable = true;
    return 0;
  }
  return state.length;
}

end事件

read函數調用過程當中,node會擇機斷定是否觸發end事件,斷定標準主要是如下兩個條件:

if (state.length === 0 && state.ended) endReadable(this);
  1. 底層數據(資源)沒有可讀數據,此時state.endedtrue
經過調用 pull(null)表示底層數據當前已經沒有可讀數據了
  1. 緩存池中沒有可讀數據 state.length === 0

本事件在調用read([size])時觸發(知足上述條件時)

doRead

doRead用於判斷是否讀取底層數據

// 若是當前是暫停模式`state.needReadable`
  var doRead = state.needReadable;
  
  // 若是當前緩存池是空的或者沒有足夠的緩存
  if (state.length === 0 || state.length - n < state.highWaterMark){
    doRead = true;
  }

  if (state.ended || state.reading) {
    doRead = false;
  } else if (doRead) {
    // ...
    this._read(state.highWaterMark);
    // ...
  }

state.reading標誌上次從底層取數據的操做是否已完成,一旦push方法被調用,就會設置爲false,表示這次_read()結束

data事件

官方文檔中提到:添加data事件句柄,可使Readable Stream的模式切換到流動模式,但官方沒有提到的是這一結果成立的條件-state.flowing的值不爲null,即只有在初始狀態下,監聽data事件,會使流進入流動模式。舉個例子:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this._time = 0;
  }
  _read(){
    this.push(String(++this._time));
  }
}

const exampleReadable = new ExampleReadable();
// 暫停 state.flowing === false
exampleReadable.pause();
exampleReadable.on('data',(chunk)=>{
  console.log(`Received ${chunk.length} bytes of data.`);
});

運行這個例子,咱們發現終端沒有任何輸出,爲何會這樣呢?緣由咱們能夠從源碼中看出端倪

if (state.flowing !== false)
      this.resume();

由此咱們能夠把官方表述再完善一些:在可讀流初始化狀態下(state.flowing === null),添加data事件句柄會使流進入流動模式。

push

只能被可讀流的實現調用,且只能在 readable._read() 方法中調用。

push是數據生產的核心,消費方經過調用read(n)促使流輸出數據,而流經過_read()使底層調用push方法將數據傳給流。

在這個過程當中,push方法有可能將數據存放在緩存池內,也有可能直接經過data事件輸出。下面咱們一一分析。

若是當前流是流動的(state.flowing === true),且緩存池內沒有可讀數據,
那麼數據將直接由事件data輸出

// node 源碼
if (state.flowing && state.length === 0 && !state.sync){
    state.awaitDrain = 0;
    stream.emit('data', chunk);
}

咱們舉個例子:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this.max = 100;
    this.time = 0;
  }
  _read(){
    const seed = setTimeout(()=>{
      if(this.time > 100){
        this.push(null);
      }else{
        this.push(String(++this.time));
      }
      clearTimeout(seed);
    },0)
  }
}
const exampleReadable = new ExampleReadable({ });
exampleReadable.on('data',(data)=>{
  console.log('from data',data);
});

readable事件

exampleReadable.on('readable',()=>{
    ....
});

當咱們註冊一個readable事件後,node就會作如下處理:

  1. 將流切換到暫停模式
state.flowing = false; 
state.needReadable = true;
  1. 若是緩存池未消耗的數據,觸發readable
stream.emit('readable');
  1. 不然,判斷當前是否正在讀取底層數據,若是不是,開始(nextTick)讀取底層數據self.read(0);
觸發條件
  1. state.flow === false當前處於暫停模式
  2. 緩存池中還有數據或者本輪底層數據已經讀取完畢state.length || state.ended
return !state.ended &&
    (state.length < state.highWaterMark || state.length === 0);

參考

image

相關文章
相關標籤/搜索