深刻node之Transform

Transform流特性

在開發中直接接觸Transform流的狀況不是不少,每每是使用相對成熟的模塊或者封裝的API來完成流的處理,最爲特殊的莫過於through2模塊和gulp流操做。那麼,Transform流到底有什麼特色呢?html

從名稱上說,Transform意爲處理,相似於生產流水線上的每一道工序,每道工序針對到來的產品做相應的處理;從結構上看,Transform是一個雙工流,通俗的解釋它既能夠做爲可讀流,也可做爲可寫流。可是,node卻對Transform流針對其特性作了更爲特殊的定製,使Transform不是單純的Duplex流。node

Transform流因爲包含了Readable和Writeable特性,所以Transform在實際使用中有着多種方式:它既能夠只做爲消費者消費數據,也可同時做爲生產者和消費者完成數據中間處理。下面將逐漸深刻內部闡述Transform的運行機理及使用技巧。c++

Transform內部架構

Transform架構圖

上圖表示一個Transform實例的組成部分:Readable部分緩衝(數組)、內部_read函數、Writeable部分緩衝(鏈表)、內部_write函數、Transform實例必須實現的內部_transform函數以及系統提供的回調函數afterTransform。因爲Transform實例同時擁有兩部分緩衝,所以2個緩衝的存儲、消耗的順序也就須要瞭解,這對於後面使用原生Transform編寫代碼有很大的指導意義。gulp

傳統意義的流(即Readable和Writeable)的實現者都須要實現對應的內部函數_read()和_write(),對於Readable實例而言,_read函數用於準備從源文件中獲取數據並添加到讀緩衝中;對於Writeable實例_write函數則從寫緩衝鏈表中一次刷入到磁盤中。它們分別對應了讀寫流程的首尾步驟,具體能夠關注node中的Stream一文。數組

而Transform中的_read和_write函數的實現大有不一樣,因爲須要兼顧流的處理,所以着重分析Transform的內部函數執行流程。緩存

執行流程

示例demo:
readable.pipe(transform);

以上段示例代碼爲例,transform做爲消費者消費readable。
Transform的實例transform擁有transormState和readableState屬性,保存了相關屬性,如tranform狀態信息、回調函數存儲和編碼等。transform做爲消費者,會在其write函數中消費數據,在node中的Stream文中介紹了write函數的實現細節,經過內部調用_write函數實現數據的寫入。而在Transform中_write函數已經重寫:架構

  1. 保存transform收到的chunk數據、編碼和函數(執行刷新寫緩衝)
  2. 在必定條件下執行_read函數(當狀態爲非轉換下,只要讀緩衝大小未超過設定的大小,則執行_read)

若是一切順利,readable的數據會順利執行transform的**write->_write->_read**,那麼本來負責填充讀緩衝的_read在Transform中發生了哪些改變呢?函數

Transform.prototype._read = function(n) {
  var ts = this._transformState;

  if (ts.writechunk !== null && ts.writecb && !ts.transforming) {
    ts.transforming = true;
    this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
  } else {
    // mark that we need a transform, so that any data that comes in
    // will get processed, now that we've asked for it.
    ts.needTransform = true;
  }
};

可見,_read的實現很是簡單,根據條件選擇執行_transform函數。須要注意的是_read的參數n並未有使用,由於是否插入數據至讀緩衝是由開發者在_transform中來決定。相信你們對_transform函數並不陌生,node規定Transform實例必須提供_transform函數,而該函數正是在_read中調用。oop

_transform有三個參數,第一個爲待處理的chunk數據,第二個爲編碼,第三個爲回調函數。前兩個參數很好理解,咱們能夠在_transform中盡情的處理數據,最後調用回調函數完成處理。那麼,這個回調函數到底是什麼? 它就是Transform架構圖中的afterTransform函數,它有幾個功能:ui

  1. 清空各類狀態信息,如transformState對象的一些屬性,用於下次處理數據使用
  2. 可選的保存處理結果至讀緩衝區
  3. 刷新寫緩衝區,執行下一階段的數據流處理

可見,在afterTransform函數執行後,才基本宣告transform第一階段的結束。爲什麼是第一階段呢?由於transform才完成了做爲消費者(即Writeable)的做用,若是用戶在_transform中傳入了數據到讀緩衝區,那麼此時transform也同時是一個生產者,提供數據讓後面的消費者消費數據,這就涉及到了Transform使用上的問題。

Transform的生產消費實例

const stream = require('stream')
var c = 0;
const readable = stream.Readable({
  highWaterMark: 2,
  read: function () {
    var data = c < 26 ? String.fromCharCode(c++ + 97) : null;
    console.log('push', data);
    this.push(data);
}
})

const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf.toString());
    next(null, buf);
  }
})

readable.pipe(transform);

示例代碼很簡單,建立了一個可讀流,向消費者提供a-z的小寫字母;建立了一個轉換流,在_transform函數中針對數據並不作處理僅做打點輸出,並向回調函數傳遞數據至讀緩衝區。咱們的目的是經過transform輸出26個小寫字母,可是當前程序執行的結果並不讓人滿意:

執行結果:
push a
push b
transform a
push c
transform b
push d
push e
push f

tranform僅僅處理到字母b,readable也僅僅提供了a-f的數據便戛然而止,這是爲什麼?

這一切都歸結於transform對象。認真讀過上文後咱們知道,全部的Transform實例同時有兩個緩衝區,其中寫緩衝區用來接收生產者的數據進行轉換操做,讀緩衝區則緩存數據給消費者使用。而在當前的實現中,transform._transform函數輸出了待處理數據,同時執行next(null, buf);。該函數上文已有分析,即afterTransform函數,第一個參數爲Error實例,第二個則爲存入讀緩衝區的數據。在本例中,執行完_transform後將處理後的數據存入讀緩衝區,等待後面的消費者消費讀緩衝區的數據。但是,transform後面沒有消費者了,所以transform在處理完字母b存入讀緩衝區後,讀緩衝區已經滿了(設定highWaterMark爲2,即讀寫緩衝區的最大值均爲2字節)。當字母c、d也執行到tranform._write後,因爲不知足執行transform._read的條件沒法執行transform._transform函數,更沒法執行afterTransform函數,致使沒法刷新寫緩衝區的數據,形成字母c、d貯存在寫緩衝區。而字母e、f則因爲transform的寫緩衝區滿(transform.write()返回false),只有存儲在readable的讀緩衝區中,等待消費。這就形成了死循環,readable和transform全部的緩衝區都滿了,流也就中止了。

解決這個問題的方法很簡單,有兩種不一樣方案:

  1. transform的讀緩衝區保持爲空
  2. 增長消費者消費transform的讀緩衝區

其實本質上都是讓transform的讀緩衝區獲得消耗。

第一種方案:

保證transform的讀緩衝區爲空:
const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf.toString())
    next(null, null)
  }
})

只需向next函數傳入null便可,這樣transform消費完數據後即宣告數據處理結束,讀緩衝區始終爲空。

第二種方案:

添加消費者:
const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf.toString())
    next(null, buf)
  }
})

readable.pipe(transform).pipe(process.stdout);

transform實現不變,只是添加了消費者process.stdout。這樣也同時保證了transform的讀緩衝區處於可添加狀態,也給了afterTransform函數刷新寫緩衝區的機會,開啓新的數據處理流程。

through2的實現

through2的重頭戲在於Transform流,使用through2的API可方便的建立一個Transform實例,完成數據流的處理。

function through2 (construct) {
  return function (options, transform, flush) {
    if (typeof options == 'function') {
      flush     = transform
      transform = options
      options   = {}
    }

    if (typeof transform != 'function')
      transform = noop

    if (typeof flush != 'function')
      flush = null

    return construct(options, transform, flush)
  }
}

module.exports = through2(function (options, transform, flush) {
  var t2 = new DestroyableTransform(options)

  t2._transform = transform

  if (flush)
    t2._flush = flush

  return t2
})

可見,through2模塊僅僅是封裝了Transform的構造函數,並封裝了更爲易用的objectMode模式。之因此建議使用through2建立Transform對象,不只僅是由於其提供了方便的API,更主要的是爲了兼容性。Transform對象是屬於Stream2.0的特性,早先版本的node並無實現,而經過through2建立的Transform實例在以前版本的node下仍可正常使用,這是因爲through2並未引用node默認提供的stream模塊,而是使用社區中較爲流行的「readable-stream」模塊。

總結

本文旨在深刻through2中的使用的Transform流進行探究,並做爲上一篇文章node中的stream的回顧和應用。經過文末簡單的示例瞭解Transform在開發中可能出現的問題,學會隨意切換Transform的生產者和消費者的身份,更好的指導實際開發。

相關文章
相關標籤/搜索