天天學點node系列-stream

在編寫代碼時,咱們應該有一些方法將程序像鏈接水管同樣鏈接起來 -- 當咱們須要獲取一些數據時,能夠去經過"擰"其餘的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964html

爲何應該使用stream?

在node中,I/O都是異步的,因此在和硬盤以及網絡的交互過程當中會涉及到傳遞迴調函數的過程。你以前可能會寫出這樣的代碼:node

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });});
server.listen(8000);

上面的這段代碼並無什麼問題,可是在每次請求時,咱們都會把整個data.txt文件讀入到內存中,而後再把結果返回給客戶端。想一想看,若是data.txt文件很是大,在響應大量用戶的併發請求時,程序可能會消耗大量的內存,這樣極可能會形成用戶鏈接緩慢的問題。其次,上面的代碼可能會形成很很差的用戶體驗,由於用戶在接收到任何的內容以前首先須要等待程序將文件內容徹底讀入到內存中。所幸的是,(req,res) 參數都是流對象,這意味着咱們可使用一種更好的方法來實現上面的需求:gulp

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

在這裏,.pipe()方法會自動幫助咱們監聽data和end事件。上面的這段代碼不只簡潔,並且data.txt文件中每一小段數據都將源源不斷的發送到客戶端。
除此以外,使用.pipe()方法還有別的好處,好比說它能夠自動控制後端壓力,以便在客戶端鏈接緩慢的時候node能夠將盡量少的緩存放到內存中。後端

認識NodeJS中的stream

流(stream)是 Node.js 中處理流式數據的抽象接口。·stream 模塊用於構建實現了流接口的對象。api

咱們用到的不少核心模塊都是stream的實例。 例如:http.clientRequest, process.stdout。緩存

流能夠是可讀的、可寫的、或者可讀可寫的。安全

全部的流都是 EventEmitter 的實例。網絡

雖然咱們平時開發過程當中日常不會直接用到stream模塊,可是也須要了解其運行機制。併發

對於想要實現自定義stream實例的開發者來講,就得好好研究stream的擴展API了,好比gulp的內部實現就大量用到了自定義的stream類型。dom

stream的類型

Node.js 中有四種基本的流類型:

  • Writable - 可寫入數據的流(例如 fs.createWriteStream())。
  • Readable - 可讀取數據的流(例如 fs.createReadStream())。
  • Duplex - 可讀又可寫的流(例如 net.Socket)。
  • Transform - 在讀寫過程當中能夠修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())。

使用Stream可實現數據的流式處理,如:

var fs = require('fs') 
// `fs.createReadStream`建立一個`Readable`對象以讀取`bigFile`的內容,並輸出到標準輸出 
// 若是使用`fs.readFile`則可能因爲文件過大而失敗 
fs.createReadStream(bigFile).pipe(process.stdout)

Readable

Readable流能夠產出數據,你能夠將這些數據傳送到一個writable,transform或者duplex流中,只須要調用pipe()方法:

建立個readable流

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);

下面運行代碼

$ node read.js
beep boop

在上面的代碼中rs.push(null)的做用是告訴rs輸出數據應該結束了。
須要注意的一點是咱們在將數據輸出到process.stdout以前已經將內容推送進readable流rs中,可是全部的數據依然是可寫的。這是由於在你使用.push()將數據推動一個readable流中時,一直要到另外一個東西來消耗數據以前,數據都會存在一個緩存中。然而,在更多的狀況下,咱們想要的是當須要數據時數據纔會產生,以此來避免大量的緩存數據。

流式消耗迭代器中的數據

咱們能夠經過定義一個._read函數來實現按需推送數據:

const Readable = require('stream').Readable
class ToReadable extends Readable {
    constructor(iterator) {
        super()
        this.iterator = iterator
    }
    // 子類須要實現該方法
    // 這是生產數據的邏輯
    _read() {
        const res = this.iterator.next()
        if (res.done) {
            // 數據源已枯竭,調用`push(null)`通知流
            return this.push(null)
        }
        setTimeout(() => {
        // 經過`push`方法將數據添加到流中
            this.push(res.value + '\n')
        }, 0)
    }
}
module.exports = ToReadable

使用時,new ToReadable(iterator)會返回一個可讀流,下游能夠流式的消耗迭代器中的數據。

const iterator = function (limit) {
    return {
        next: function () {
            if (limit--) {
                return { done: false, value: limit + Math.random() }
            }
            return { done: true }
        }
    }
}(1e10)
const readable = new ToReadable(iterator)
// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data))
// 全部數據均已讀完
readable.on('end', () => process.stdout.write('DONE'))

執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。

建立可讀流時,須要繼承Readable,並實現_read方法。 * _read方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。 * 在_read方法中,經過調用push(data)將數據放入可讀流中供下游消耗。 * 在_read方法中,能夠同步調用push(data),也能夠異步調用。 * 當所有數據都生產出來後,必須調用push(null)來結束可讀流。 * 流一旦結束,便不能再調用push(data)添加數據。

能夠經過監聽data事件的方式消耗可讀流。 * 在首次監聽其data事件後,readable便會持續不斷地調用_read(),經過觸發data事件將數據輸出。 * 第一次data事件會在下一個tick中觸發,因此,能夠安全地將數據輸出前的邏輯放在事件監聽後(同一個tick中)。 * 當數據所有被消耗時,會觸發end事件。

上面的例子中,process.stdout表明標準輸出流,實際是一個可寫流。

Writable

一個writable流指的是隻能流進不能流出的流:

src.pipe(writableStream)

建立一個writable流

只須要定義一個._write(chunk,enc,next)函數,你就能夠將一個readable流的數據釋放到其中:

const Writable = require('stream').Writable

const writable = Writable()
// 實現`_write`方法
// 這是將數據寫入底層的邏輯
writable._write = function (data, enc, next) {
  // 將流中的數據寫入底層
  process.stdout.write(data.toString().toUpperCase())
  // 寫入完成時,調用`next()`方法通知流傳入下一個數據
  process.nextTick(next)
}

// 全部數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))

// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')

// 再無數據寫入流時,須要調用`end`方法
writable.end()

運行結果以下:

$ node 1.js
A
B
C
DONE
  • 上游經過調用writable.write(data)將數據寫入可寫流中。write()方法會調用_write()將data寫入底層。
  • _write中,當數據成功寫入底層後,必須調用next(err)告訴流開始處理下一個數據。
  • 在從一個readable流向一個writable流傳數據的過程當中,數據會自動被轉換爲Buffer對象,除非你在建立writable流的時候制定了decodeStrings參數爲false:Writable({decodeStrings: false})
  • 若是你須要傳遞對象,須要指定objectMode參數爲trueWritable({ objectMode: true })
  • 在end方法調用後,當全部底層的寫操做均完成時,會觸發finish事件。
  • 上游必須調用writable.end(data)來結束可寫流,data是可選的。此後,不能再調用write新增數據。
  • next的調用既能夠是同步的,也能夠是異步的.

_write的參數:

  • 第一個參數,chunk表寫進來的數據。
  • 第二個參數 enc 表明編碼的字符串,可是隻有在opts.decodeStringfalse的時候你才能夠寫一個字符串。
  • 第三個參數,next(err)是一個回調函數,使用這個回調函數你能夠告訴數據消耗者能夠寫更多的數據。你能夠有選擇性的傳遞一個錯誤對象error,這時會在流實體上觸發一個emit事件。

向一個writable流中寫東西

若是你須要向一個writable流中寫東西,只須要調用.write(data)便可。

process.stdout.write('beep boop\n');

爲了告訴一個writable流你已經寫完畢了,只須要調用.end()方法。你也可使用.end(data)在結束前再寫一些數據。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
},1000);

運行結果以下所示:

$ node writing.js 
$ cat message.txt
beep boop

若是你在建立writable流時指定了highWaterMark參數,那麼當沒有更多數據寫入時,調用.write()方法將會返回false。若是你想要等待緩存狀況,能夠監聽drain事件。

Duplex

Duplex流是一個可讀也可寫的流,就好像一個電話,能夠接收也能夠發送語音。一個rpc交換是一個duplex流的最好的例子。若是你看到過下面這樣的代碼:

a.pipe(b).pipe(a)

那麼你須要處理的就是一個duplex流對象。

實現一個Duplex

var Duplex = require('stream').Duplex
var duplex = Duplex()
// 可讀端底層讀取邏輯
duplex._read = function () {
    this._readNum = this._readNum || 0
    if (this._readNum > 1) {
        this.push(null)
    } else {
        this.push('' + (this._readNum++))
    }
}
// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
    // a, b
    process.stdout.write('_write ' + buf.toString() + '\n')
    next()
}
// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()

上面的代碼中實現了_read方法,因此能夠監聽data事件來消耗Duplex產生的數據。 同時,又實現了_write方法,可做爲下游去消耗數據。
由於它既可讀又可寫,因此稱它有兩端:可寫端和可讀端。 可寫端的接口與Writable一致,做爲下游來使用;可讀端的接口與Readable一致,做爲上游來使用。

Transform

Transform stream是Duplex stream的特例,也就是說,Transform stream也同時可讀可寫。跟Duplex stream的區別點在於,Transform stream的輸出與輸入是存在相關性的。

const Transform = require('stream').Transform
class Rotate extends Transform {
    constructor(n) {
        super()
        // 將字母旋轉`n`個位置
        this.offset = (n || 13) % 26
    }
    // 將可寫端寫入的數據變換後添加到可讀端
    _transform (buf, enc, next) {
        var res = buf.toString().split('').map(c => {
            var code = c.charCodeAt(0)
            if (c >= 'a' && c <= 'z') {
                code += this.offset
                if (code > 'z'.charCodeAt(0)) {
                    code -= 26
                }
            } else if (c >= 'A' && c <= 'Z') {
                code += this.offset
                if (code > 'Z'.charCodeAt(0)) {
                    code -= 26
                }
            }
            return String.fromCharCode(code)
        }).join('')
        // 調用push方法將變換後的數據添加到可讀端
        this.push(res)
        // 調用next方法準備處理下一個
        next()
    }
}
var transform = new Rotate(3)
transform.on('data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world!')
transform.end()

執行結果以下:

$ node 1.js
khoor, zruog!

Tranform繼承自Duplex,並已經實現了_read和_write方法,同時要求用戶實現一個_transform方法。

相關連接

https://nodejs.org/api/stream.html

相關文章
相關標籤/搜索