Node.js Streams 基礎總結

前段時間遇到項目上須要請求資源方獲取opus編碼的音頻文件,而後置入ogg容器中傳輸給前端標準化播放器進行播放的需求。流程模式是,經過服務上創建的socket鏈接不斷接收資源方傳送的文件塊。而前端請求中層服務是HTTP請求。html

一個簡單的需求,在Node.js服務中,比較適合處理方式是使用Stream,經過pipe不一樣的加解密流以及最後的HTTP responses傳輸給前端標準格式的文件。因爲用到不少流處理方式,因此在此總結一下Node.js Streams的模塊使用基礎。前端

1. Node.js Stream 使用場景

Stream就是數據信息的一個傳輸集合。適合進行大文件和連續的傳輸文件塊的處理,但不只限於此。好比咱們要將一個文件進行加密而後再壓縮再傳輸的,經過讓數據在不一樣的Stream中傳輸處理,不管在書寫仍是處理效率上都頗有優點。node

2. Stream究竟是什麼

Node.js中,Buffer是很是重要的一個模塊。在不少數據處理中會有所應用。它進行內存分配不是經過V8實現的,而是分配的外部對外內存,主要緣由多是V8的垃圾回收機制過於影響性能。Buffer,性能部分是C++實現,而於Node.js進行了非性能方面的實現以及開放調用。ios

簡單理解,Stream其實就是Buffer的一個更高級封裝加另外實現。但Stream和Buffer在使用中,仍是具備不一樣。好比,Buffer在分配完成讀取全部數據後,才能進行使用,而Stream只要創建,當消費者須要使用時即可以使用。同時,數據能夠進入緩衝區,而這個緩衝區,其實就是Buffer。api

不難看出,在這樣的實現下,在處理過程當中,特別是大文件處理,Stream的佔用內存會更低,而處理效率會更高。緩存

咱們經過兩段代碼來查看二者的區別bash

// 使用Buffer拷貝test.file大文件  大小 556641 KB

const fs = require('fs')
const time = Date.now()

fs.readFile('./test.file', (err, buffer) => {
    fs.writeFile('./test.buffer.file', buffer, err => {
        console.log('memory use: ', process.memoryUsage())
        console.log('buffer', Date.now() - time)
        console.log('finish...')
    })
})
複製代碼
// 使用Stream拷貝test.file大文件  大小 556641 KB

const fs = require('fs')
const time = Date.now()

fs.createReadStream('./test.file')
    .pipe(fs.createWriteStream('./test.stream.file'))
    .on('finish', () => {
        console.log('memory use: ', process.memoryUsage())
        console.log('stream', Date.now() - time)
        console.log('finish...')
    })
複製代碼

如下是輸出結果: (只是一個簡單的測試,可是區別仍是比較明顯的)服務器

若是須要,二者之間也能夠進行轉換socket

// stream to buffer
function streamToBuffer(stream, cb) { 
    let buffers = [];
    stream.on('error', function(err) {
        console.log(err)
    })
    stream.on('data', function(data) {
        buffers.push(data)
    })
    stream.on('end', function() { 
        cb(buffers)
    })
}   
複製代碼
// buffer to stream
var stream = require('stream')
function bufferToStream(buffer) {  
  var stream = new stream.Duplex()
  stream.push(buffer)   // 讀入
  stream.push(null)     // null 表明讀入結束
  return stream
}
複製代碼

3. Stream 分類和基礎用法

Node.js中,Stream有4種類型性能

  • Readable: 可讀流

    • HTTP responses, on the client
    • HTTP requests, on the server
    • fs read streams
    • zlib streams
    • crypto streams
    • TCP sockets
    • child process stdout and stderr
    • process.stdin
  • Writable: 可寫流

    • HTTP requests, on the client
    • HTTP responses, on the server
    • fs write streams
    • zlib streams
    • crypto streams
    • TCP sockets
    • child process stdin
    • process.stdout
    • process.stderr
  • Duplex: 可讀可寫

    • TCP sockets
    • zlib streams
    • crypto streams
  • Tranform: 讀寫過程當中處理

    • zlib streams
    • crypto streams

雖然,上述Duplex和Tranform分類中包含了一樣的Node.js實現的庫,可是其實兩種Stream存在區別。Duplex,可讀可寫,而在讀寫上的操做是相互獨立存在的,互相不影響。而Tranform,讀寫是統一的,也就是說,Tranform中讀入的數據通過處理,會直接寫入。兩者都是繼承了Readable和Writable後進行實現的。

基本使用

// 文件系統下
const fs = require('fs')
var rStream = fs.createReadStream(file)
var wStream = fs.createWriteStream(file)

// 直接使用stream模塊
const stream = require('stream')
var rStream = new stream.Readable()
var wStream = new stream.Writable()
var dStream = new stream.Duplex()
var tStream = new stream.Transform()

// 讀寫流的事件監聽
rOrWStream.on('open', function() {
    // 監聽打開
})
rOrWStream.on('data', function(data) {
    // 監聽數據
})
rOrWStream.on('error', function(error) {
    // 監聽讀寫錯誤
})
rOrWStream.on('end', function(end) {
    // 監聽讀取或寫入結束
})
rOrWStream.on('close', function() {
    // 監聽關閉流
})
複製代碼

stream模塊中,不一樣類型stream提供了多種的方法能夠調用,不在此過多贅述,能夠查看 官方文檔

其中比較特別的是,在stream中,參數highWaterMark設置值,是stream存儲的最大值,觸發了stream存儲設置的highWaterMark後,Writable和Readable二者表現有些許不一樣。

const fs = require('fs')
var rStream = fs.createReadStream(file, {
    flags: 'r',	       // 指定用什麼模式打開文件,’w’表明寫,’r’表明讀,相似的還有’r+’、’w+’、’a’等
    encoding: 'utf8',  // 編碼格式
    autoClose: true,   // 是否發生錯誤或結束時自動關閉
    highWaterMark: 9,  // 單位KB,不設置默認爲16KB
    start:0,           // 開始讀取範圍
    end:0              // 結束讀取範圍,從文件中讀取一個字節範圍,而不是整個文件
})
複製代碼

Writable觸發後,不能繼續寫入,調用Writable write方法返回false,而當緩存區能夠繼續寫入數據的時候,是會觸發'drain'事件。

Readable,存在三種狀態

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

null表示沒有提供消費流數據的機制,因此流不會產生數據。監聽 'data' 事件、調用pipe、resume方法都會使狀態切換到 true,可讀流開始主動地產生數據並觸發事件。 調用pause、unpipe,或接收到背壓(也就是緩存區達到highWaterMark值,如上writable一樣),則狀態會被設爲 false,暫時中止事件流動但不會中止數據的生成。 在這個狀態下,爲 'data' 事件綁定監聽器不會使狀態切換到 true。

4. 定製化Stream

Stream中的pipe方法猶如管道同樣,讓數據能夠連續經過不一樣的流處理,好比

const fs = require('fs')
var rStream = fs.createReadStream(file)
var wStream = fs.createWriteStream(renamefile)
rStream.pipe(wStream)
複製代碼

在具體的開發中,經常須要對數據進行處理,咱們須要重寫模塊中的方式

用例 方法
只讀流 Readable _read
只寫流 Writable _write _writev _final
可讀可寫 Duplex _read _write _writev _final
讀入處理,寫入 Transform _transform _flush _final

寫法有不少種,以Writable爲例

// prototype繼承
var stream = require('stream')
var util = require('util')

function MyStream () {
  stream.Writable.call(this)
}
util.inherits(MyStream, stream.Writable)
MyStream.prototype._write = function (chunk, encoding, callback) {
  console.log(chunk.toString())
  callback()
}
var myStream = new MyStream()
process.stdin.pipe(myStream)
複製代碼
// 使用實例
var stream = require('stream')
var myStream = new stream.Writable()
myStream._write = function (chunk, encoding, callback) {
  console.log(chunk.toString())
  callback()
}
process.stdin.pipe(myStream)
複製代碼
// 使用Constructor API
var myStream = new stream.Writable({
    _write: function(chunk, encoding, callback) {
      console.log(chunk.toString())
      callback()
    }
})
複製代碼
// ES6類寫法, Node 4+
class MyStream extends stream.Writable {
    _write(chunk, enc, callback) {
      console.log(chunk.toString())
      callback()
    }
}
var myStream = new MyStream()
複製代碼

如下以ES6的寫法爲主

Writable

const { Writable } = require('stream')

class OutStream extends Writable {
    constructor(option) {
        super()
        this.encode = option.encode
    }
    _write(chunk, enc = this.encode, next) {
        console.log(chunk.toString())
        next && next()
    }
}

const outStream = new OutStream({
    encode: 'utf-8',
})
process.stdin.pipe(outStream)
複製代碼

Readable

const { Readable } = require('stream')

class InStream extends Readable {
    constructor() {
        super()
    }
    _read(size) { 
        // size就是highWaterMark值
    }
}

const inStream = new InStream()
inStream.push('ABCDEFG')  // push讀入
inStream.push('HIJKLMN')
inStream.push(null)       // null表示已經無數據
inStream.pipe(process.stdout)
複製代碼

能夠重寫 _read

class InStream extends Readable {
    constructor() {
        super()
    }
    // _read會持續觸發
    _read(size) { 
        this.push(this.num++)
        if(this.num > 20) {
            this.push(null) // 當num > 20,push(null)結束讀入
        }
    }
}
const inStream = new InStream()
inStream.num = 0
inStream.pipe(process.stdout)
複製代碼

Duplex

const { Duplex } = require('stream')
class IoStream extends Duplex {
    constructor(option) {
        super()
        let op = option || {}
        this.encode = option.encode
    }
    // 同時重寫 _write _read
    _write(chunk, enc, next) {
        console.log(chunk.toString())
        next && next()
    }
    _read(size) {
        this.push(this.num++)
        if(this.num > 20) {
            this.push(null)    
        }
    }
}

const iostream = new IoStream({
    encode: 'utf-8',
})
iostream.num = 0
process.stdin.pipe(iostream).pipe(process.stdout)
複製代碼

Transform

const { Transform } = require('stream')
class MyTransform extends Transform {
    constructor() {
        super()
    }
    // 重寫 _transform 讀入處理後寫入
    _transform(chunk, enc, next) {
        this.push(chunk.toString().toUpperCase())
        next && next()
    }
}

const mytransfrom = new MyTransform()
process.stdin.pipe(mytransfrom).pipe(process.stdout)
複製代碼

5. objectMode

有時咱們須要處理的不只僅是字符串,還包括特殊數據。

好比,若是有一個須要和C++服務器通訊或互通訊息的Node.js服務器,經過一個cache存儲二進制數據,要求存入的是C++ struct結構數據。這時候,須要經過Node.js的Object處理實現一樣結構數據。而同時又須要流處理的話,則須要使用到Stream的objectMode。

經過Transform進行簡單介紹

const { Transform } = require('stream')
class CreateArray extends Transform {
    constructor() {
        // 開啓objectMode模式
        super({
            readableObjectMode: true,
            writableObjectMode: true
        })
    }
    
    _transform(chunk, enc, next) {
        // 能夠直接push Object數據
        this.push(chunk.toString().trim().split(','))
        next && next()
    }
}

class ObjToString extends Transform {
    constructor() {
        super({
            readableObjectMode: true,
            writableObjectMode: true
        })
    }
    
    _transform(chunk, enc, next) {
        // chunk能夠是Object數據
        this.push(JSON.stringify(chunk))
        next && next()
    }
}

const createArray = new CreateArray()
const objtostring = new ObjToString()
process.stdin.pipe(createArray).pipe(objtostring).pipe(process.stdout)
複製代碼

6. 小結

Stream是Node.js中很重要的模塊,處理數據高效,在項目中須要更靈活的使用。

相關文章
相關標籤/搜索