經過Node.js Stream API 實現逐行讀取的實例

Node 給 streaming 帶來了簡潔和美。Streams 目前是一種很棒的用於創建模塊和應用的方式。原來的 streams API 存在一些問題,在 v0.10 版本中對這些問題進行了修復,而且擴展了一些 API 使得應用更簡單而且能夠歸納更多的應用場景。node

本篇文章將介紹並經過實例應用 v0.10 版本提供的新的 API。shell

逐行問題

具備良好組織的日誌數據對一個公司的開發團隊是很是寶貴的資源。爲了更好的使用並分析它們,而不僅是經過 shell 命令行去操做,咱們須要逐行掃描日誌數據。api

Stream 的優勢就是咱們不須要把整個日誌文件都一次性讀入內存,由於它們可能很龐大,而是在它們能夠準備好去讀取的時候再處理它們。Stream 能夠工做於任何 I/O 場景下,包括文件系統,網絡等等。數組

經過使用心得 stream API,咱們能夠建立可複用的實現上述逐行操做的 I/O 模塊。網絡

Transform Stream

Node 0.10 提供了很是優雅的 stream.Transform 類,用以處理輸入輸出是因果相關的。對於咱們的問題來講,輸入和輸出的數據是徹底同樣的,只是把輸入的數據逐行分離爲了更好的處理。異步

位於管道中間層的 Transform 是便可讀也可寫的:
Transform pipeline
如下是使用 Transform的初始化代碼:函數

var stream = require('stream')
var liner = new stream.Transform( { objectMode: true } )

打開 objectMode

吼吼,這個 {objectMode: true} 是個啥?若是沒有這個 objectMode,stream 默認把純數據塊送過來,不然會把數據快放到一個 object 中,固然這個 object 中還會包含其餘信息。oop

_transform 方法

這只是一個開始,咱們繼續。Transform 類在應用時須要咱們必須提供一個叫作 _transform 的方法,還有一個 _flush 方法能夠選擇提供。咱們先來看一下這個 _transform 方法究竟是什麼。測試

_transform 方法在每次 stream 中有數據來了以後都會被執行,先看代碼:ui

liner._transform = function (chunk, encoding, done) {
  var data = chunk.toString()
  if (this._lastLineData) data = this._lastLineData + data 

  var lines = data.split('\n') 
  this._lastLineData = lines.splice(lines.length-1,1)[0] 

  lines.forEach(this.push.bind(this)) 
  done()
}

數據一來,_transform 方法就會被執行。聯同數據一塊兒過來的還有數據的編碼和一個表示此數據已經接受完畢的信號函數。

在這個問題中,咱們並不關心編碼問題。經過 toString() 把數據轉爲須要的字符串,而後再經過 split('\n') 數據塊字符串按換行符打散爲一個數組。而後在把每一行 push 到對應的處理模塊中。

注:push 方法是 Readable stream 類的內置方法,同時在 Node 0.8 版本中和產生 data 時間的的方法是同類的:

stream.emit(‘data’, data) ➤ stream.push(data)

最後經過調用 done() 方法來發出接受完成的信號。因爲 done 方法是一個回調函數,咱們也能夠把它在 _transform 中進行異步調用。

代碼中的 _lastLineData 又是神馬?在 stream 中咱們並不想一塊數據的結尾是從一行的中間斷開的,爲了解決這個問題,咱們實際上並不會吧打散的數組中的最後一行送出去,而是留到下一次的數據塊來的時候放到下一次數據塊的開頭。

_flush 方法

而後咱們再來看看這個 _flulsh 方法,還記得在 _transform 方法中每次 _lastLineData 的值都不會被送出去嗎,是否是最後一次數據塊的 _lastLindeData 就無法收到了?沒錯, _flush方法就是用來處理這種狀況的。在全部的數據塊都被 _transform 方法處理事後,纔會調用 _flush 方法。因此它的做用就是處理殘留數據的:

liner._flush = function (done) {
  if (this._lastLineData) this.push(this._lastLineData)
  this._lastLineData = null
  done()
}

若是有 _lastLineData 則把它 push 出去而後清空它,最後調用 `done()` 方法標誌着完成處理殘留數據的工做,同時這也意味着 stream 的結束。須要注意的是, _flush 方法並非必須的有些場景下就不須要。

簡單代碼實現

如下代碼是一個簡單的逐行讀取的模塊的實現:

var stream = require('stream')
var liner = new stream.Transform( { objectMode: true } )

liner._transform = function (chunk, encoding, done) {
  var data = chunk.toString()
  if (this._lastLineData) data = this._lastLineData + data

  var lines = data.split('\n')
  this._lastLineData = lines.splice(lines.length-1,1)[0]

  lines.forEach(this.push.bind(this))
  done()
}

liner._flush = function (done) {
     if (this._lastLineData) this.push(this._lastLineData)
     this._lastLineData = null
     done()
}

module.exports = liner

測試實例

看起來已經差很少了,是否是能夠用起來了?

首先咱們須要一個數據源。任何由行組成的文件均可以,以日誌文件爲例:

var fs = require('fs')
var liner = require('./liner')
var source = fs.createReadStream('./access_log')
source.pipe(liner)
liner.on('readable', function () {
  var line
  while (line = liner.read()) {
    // do something with line
  }
})

原文連接

相關文章
相關標籤/搜索