在編寫代碼時,咱們應該有一些方法將程序像鏈接水管同樣鏈接起來 -- 當咱們須要獲取一些數據時,能夠去經過"擰"其餘的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964html
在node中,I/O都是異步的,因此在和硬盤以及網絡的交互過程當中會涉及到傳遞迴調函數的過程。你以前可能會寫出這樣的代碼:node
var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); });}); server.listen(8000);
上面的這段代碼並無什麼問題,可是在每次請求時,咱們都會把整個data.txt文件讀入到內存中,而後再把結果返回給客戶端。想一想看,若是data.txt文件很是大,在響應大量用戶的併發請求時,程序可能會消耗大量的內存,這樣極可能會形成用戶鏈接緩慢的問題。其次,上面的代碼可能會形成很很差的用戶體驗,由於用戶在接收到任何的內容以前首先須要等待程序將文件內容徹底讀入到內存中。所幸的是,(req,res)
參數都是流對象,這意味着咱們可使用一種更好的方法來實現上面的需求:gulp
var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server.listen(8000);
在這裏,.pipe()方法會自動幫助咱們監聽data和end事件。上面的這段代碼不只簡潔,並且data.txt文件中每一小段數據都將源源不斷的發送到客戶端。
除此以外,使用.pipe()方法還有別的好處,好比說它能夠自動控制後端壓力,以便在客戶端鏈接緩慢的時候node能夠將盡量少的緩存放到內存中。後端
流(stream)是 Node.js 中處理流式數據的抽象接口。·stream
模塊用於構建實現了流接口的對象。api
咱們用到的不少核心模塊都是stream
的實例。 例如:http.clientRequest, process.stdout。緩存
流能夠是可讀的、可寫的、或者可讀可寫的。安全
全部的流都是 EventEmitter 的實例。網絡
雖然咱們平時開發過程當中日常不會直接用到stream
模塊,可是也須要了解其運行機制。併發
對於想要實現自定義stream實例的開發者來講,就得好好研究stream的擴展API了,好比gulp的內部實現就大量用到了自定義的stream類型。dom
Node.js 中有四種基本的流類型:
使用Stream可實現數據的流式處理,如:
var fs = require('fs') // `fs.createReadStream`建立一個`Readable`對象以讀取`bigFile`的內容,並輸出到標準輸出 // 若是使用`fs.readFile`則可能因爲文件過大而失敗 fs.createReadStream(bigFile).pipe(process.stdout)
Readable流能夠產出數據,你能夠將這些數據傳送到一個writable,transform或者duplex流中,只須要調用pipe()方法:
var Readable = require('stream').Readable; var rs = new Readable; rs.push('beep '); rs.push('boop\n'); rs.push(null); rs.pipe(process.stdout);
下面運行代碼
$ node read.js beep boop
在上面的代碼中rs.push(null)的做用是告訴rs輸出數據應該結束了。
須要注意的一點是咱們在將數據輸出到process.stdout以前已經將內容推送進readable流rs中,可是全部的數據依然是可寫的。這是由於在你使用.push()將數據推動一個readable流中時,一直要到另外一個東西來消耗數據以前,數據都會存在一個緩存中。然而,在更多的狀況下,咱們想要的是當須要數據時數據纔會產生,以此來避免大量的緩存數據。
咱們能夠經過定義一個._read函數來實現按需推送數據:
const Readable = require('stream').Readable class ToReadable extends Readable { constructor(iterator) { super() this.iterator = iterator } // 子類須要實現該方法 // 這是生產數據的邏輯 _read() { const res = this.iterator.next() if (res.done) { // 數據源已枯竭,調用`push(null)`通知流 return this.push(null) } setTimeout(() => { // 經過`push`方法將數據添加到流中 this.push(res.value + '\n') }, 0) } } module.exports = ToReadable
使用時,new ToReadable(iterator)會返回一個可讀流,下游能夠流式的消耗迭代器中的數據。
const iterator = function (limit) { return { next: function () { if (limit--) { return { done: false, value: limit + Math.random() } } return { done: true } } } }(1e10) const readable = new ToReadable(iterator) // 監聽`data`事件,一次獲取一個數據 readable.on('data', data => process.stdout.write(data)) // 全部數據均已讀完 readable.on('end', () => process.stdout.write('DONE'))
執行上述代碼,將會有100億個隨機數源源不斷地寫進標準輸出流。
建立可讀流時,須要繼承Readable,並實現_read方法。 * _read方法是從底層系統讀取具體數據的邏輯,即生產數據的邏輯。 * 在_read方法中,經過調用push(data)將數據放入可讀流中供下游消耗。 * 在_read方法中,能夠同步調用push(data),也能夠異步調用。 * 當所有數據都生產出來後,必須調用push(null)來結束可讀流。 * 流一旦結束,便不能再調用push(data)添加數據。
能夠經過監聽data事件的方式消耗可讀流。 * 在首次監聽其data事件後,readable便會持續不斷地調用_read(),經過觸發data事件將數據輸出。 * 第一次data事件會在下一個tick中觸發,因此,能夠安全地將數據輸出前的邏輯放在事件監聽後(同一個tick中)。 * 當數據所有被消耗時,會觸發end事件。
上面的例子中,process.stdout表明標準輸出流,實際是一個可寫流。
一個writable流指的是隻能流進不能流出的流:
src.pipe(writableStream)
只須要定義一個._write(chunk,enc,next)函數,你就能夠將一個readable流的數據釋放到其中:
const Writable = require('stream').Writable const writable = Writable() // 實現`_write`方法 // 這是將數據寫入底層的邏輯 writable._write = function (data, enc, next) { // 將流中的數據寫入底層 process.stdout.write(data.toString().toUpperCase()) // 寫入完成時,調用`next()`方法通知流傳入下一個數據 process.nextTick(next) } // 全部數據均已寫入底層 writable.on('finish', () => process.stdout.write('DONE')) // 將一個數據寫入流中 writable.write('a' + '\n') writable.write('b' + '\n') writable.write('c' + '\n') // 再無數據寫入流時,須要調用`end`方法 writable.end()
運行結果以下:
$ node 1.js A B C DONE
Buffer
對象,除非你在建立writable流的時候制定了decodeStrings
參數爲false
:Writable({decodeStrings: false})
。objectMode
參數爲true
,Writable({ objectMode: true })
。_write的參數:
chunk
表寫進來的數據。enc
表明編碼的字符串,可是隻有在opts.decodeString
爲false
的時候你才能夠寫一個字符串。next(err)
是一個回調函數,使用這個回調函數你能夠告訴數據消耗者能夠寫更多的數據。你能夠有選擇性的傳遞一個錯誤對象error
,這時會在流實體上觸發一個emit
事件。若是你須要向一個writable流中寫東西,只須要調用.write(data)便可。
process.stdout.write('beep boop\n');
爲了告訴一個writable流你已經寫完畢了,只須要調用.end()方法。你也可使用.end(data)在結束前再寫一些數據。
var fs = require('fs'); var ws = fs.createWriteStream('message.txt'); ws.write('beep '); setTimeout(function () { ws.end('boop\n'); },1000);
運行結果以下所示:
$ node writing.js $ cat message.txt beep boop
若是你在建立writable流時指定了highWaterMark參數,那麼當沒有更多數據寫入時,調用.write()方法將會返回false。若是你想要等待緩存狀況,能夠監聽drain事件。
Duplex流是一個可讀也可寫的流,就好像一個電話,能夠接收也能夠發送語音。一個rpc交換是一個duplex流的最好的例子。若是你看到過下面這樣的代碼:
a.pipe(b).pipe(a)
那麼你須要處理的就是一個duplex流對象。
var Duplex = require('stream').Duplex var duplex = Duplex() // 可讀端底層讀取邏輯 duplex._read = function () { this._readNum = this._readNum || 0 if (this._readNum > 1) { this.push(null) } else { this.push('' + (this._readNum++)) } } // 可寫端底層寫邏輯 duplex._write = function (buf, enc, next) { // a, b process.stdout.write('_write ' + buf.toString() + '\n') next() } // 0, 1 duplex.on('data', data => console.log('ondata', data.toString())) duplex.write('a') duplex.write('b') duplex.end()
上面的代碼中實現了_read方法,因此能夠監聽data事件來消耗Duplex產生的數據。 同時,又實現了_write方法,可做爲下游去消耗數據。
由於它既可讀又可寫,因此稱它有兩端:可寫端和可讀端。 可寫端的接口與Writable一致,做爲下游來使用;可讀端的接口與Readable一致,做爲上游來使用。
Transform stream是Duplex stream的特例,也就是說,Transform stream也同時可讀可寫。跟Duplex stream的區別點在於,Transform stream的輸出與輸入是存在相關性的。
const Transform = require('stream').Transform class Rotate extends Transform { constructor(n) { super() // 將字母旋轉`n`個位置 this.offset = (n || 13) % 26 } // 將可寫端寫入的數據變換後添加到可讀端 _transform (buf, enc, next) { var res = buf.toString().split('').map(c => { var code = c.charCodeAt(0) if (c >= 'a' && c <= 'z') { code += this.offset if (code > 'z'.charCodeAt(0)) { code -= 26 } } else if (c >= 'A' && c <= 'Z') { code += this.offset if (code > 'Z'.charCodeAt(0)) { code -= 26 } } return String.fromCharCode(code) }).join('') // 調用push方法將變換後的數據添加到可讀端 this.push(res) // 調用next方法準備處理下一個 next() } } var transform = new Rotate(3) transform.on('data', data => process.stdout.write(data)) transform.write('hello, ') transform.write('world!') transform.end()
執行結果以下:
$ node 1.js khoor, zruog!
Tranform繼承自Duplex,並已經實現了_read和_write方法,同時要求用戶實現一個_transform方法。