在Unix中流是一個標準的概念,有標準的輸入、輸出和標準的錯誤html
例如:npm
打印出全部的js文件交給grep 來過濾出包含http文件的內容,稱之爲Unix的管道後端
cat *.js | grep http瀏覽器
從上節得知Buffer是保存字節的數據,而流是用來暫存和移動數據的,它倆一般是結合起來來使用,咱們來拷貝文件,像讀取logo,是所有的讀取入到內存中,而後再寫入到文件中,對於體積比較大的的文件就不夠用了假設咱們的服務器須要不斷的去讀取文件,而後返回給客戶端,同時又有好多人都在請求這個文件,這樣每一個請求都去讀入一次內存,而後內存很快就爆掉了,最好的方式是邊讀邊寫緩存
這就須要藉助流來完成,那NodeJs中哪些模塊涉及到了流;好比http、文件系統、壓縮模塊、tcp socket而且流是以buffer的形式存在,這樣更高效。服務器
改造logo圖片讀取操做網絡
var fs = require('fs') var source = fs.readFileSync('logo.png') fs.writeFileSync('stream_copy_logo.png',source)
可是這樣的操做會不會太簡單了,而沒有辦法精細的控制數據在流裏面的傳輸,異步
以上這些都不用擔憂,Stream是基於事件機制進行工做的,socket
流在各個方面的變化均可以被咱們監聽到tcp
var fs = require('fs') //聲明一個可讀流 var readStream = fs.createReadStream('logo_stream.js') //Stream在傳輸的時候會觸發data事件 readStream .on('data',function(chunk){ console.log('data emits') console.log(Buffer.isBuffer(chunk)) console.log(chunk.toString('utf8')) }) //還有readable事件,可讀的 .on('readable',function(){ console.log('data readable') }) .on('end',function(){ console.log('data ends') }) .on('close',function(){ console.log('data close') }) .on('error',function(e){ console.log('data read error:'+e) })
運行結果以下:
藉助於Stream的事件機制,咱們就能實現更多個性化的定製,
從而對流裏面的流程進行更精細化的控制,改造上述代碼:
var fs = require('fs') //聲明一個可讀流 var readStream = fs.createReadStream('logo_stream.js') var n = 0 //Stream在傳輸的時候會觸發data事件 readStream .on('data',function(chunk){ n++ console.log('data emits') console.log(Buffer.isBuffer(chunk)) //console.log(chunk.toString('utf8')) //流暫停 readStream.pause() //設置定時器,模擬異步處理 console.log('data pause') setTimeout(function(){ console.log('data pause end') //再從新啓動 readStream.resume() },3000) }) //還有readable事件,可讀的 .on('readable',function(){ console.log('data readable') }) .on('end',function(){ console.log(n) console.log('data ends') }) .on('close',function(){ console.log('data close') }) .on('error',function(e){ console.log('data read error:'+e) })
運行效果以下:
換一個大一點的文件,3M左右的;打印結果以下:
大概每次是64kb
用事件的方式來重構複製圖片的操做
var fs = require('fs') //放入一個大文件 var readStream = fs.createReadStream('1.pdf') var writeStream = fs.createWriteStream('1_stream.pdf'); //必然觸發一個事件 readStream.on('data',function(chunk){ //寫入目標 if(writeStream.write(chunk)=== false){ //判斷是否已經寫入到目標,來解決爆倉 console.log('still cached') readStream.pause() } }) readStream.on('end',function(){ writeStream.end() }) //耗盡方法 writeStream.on('drain',function(){ console.log('data drains') readStream.resume() }) /* 這是個標準的文件的拷貝操做,可是會有問題; 若是讀的快,寫的慢;由於讀寫的速度並非恆定的,這個時候數據流內部的 緩存可能會被爆倉,那應該怎麼辦 */
運行結果以下:
邊讀邊寫效果.
Stream的種類
Readable:可讀流,用來提供數據;外部來源的數據會被存儲到buffer裏緩存起來,兩種模式:流動模式,暫停模式
Writable:可寫流,消費數據;
Duplex:雙通流,可讀可寫
Transform:轉換流,雙通
各自事件,屬性都大同小異.場景以下:請求一張圖片的數據,在瀏覽器中顯示出來
var http = require('http') var fs = require('fs') http .createServer(function(req,res){ /*fs.readFile('logo.png',function(err,data){ if(err){ res.end('file not exist') }else{ res.wirteHeader(200,{'Context-Type':'text/html'}) res.end(data) } })*/ //利用pipe就可以更簡約的實現這套邏輯 fs.createReadStream('logo.png').pipe(res) }) .listen(8090)
運行效果以下:
不止是本地圖片的讀取,也能夠是網絡環境下的
使用NodeJs中的request模塊
//使用以前先安裝,npm install request var request = require('request') request('url').pipe(res) //這樣就能夠實現邊下載邊顯示
運行結果同上。
在這裏pipe方法會自動幫咱們監聽data和end事件,還能夠自動控制後端壓力,經過對內存空間的調度就能自動控制流量、避免掉目標被快速讀取,只有末端真正須要數據的時候,數據纔會從源頭被取出來而後順着管道一路走下去
再次重構讀取pdf文件
//只須要2行代碼 var fs = require('fs') fs.createReadStream('1.pdf').pipe(fs.createWriteStream('1_pipe.pdf'))
pipe作通道鏈接時的例子:
var Readable = require('stream').Readable var Writable = require('stream').Writable //拿到兩個實例 var readStream = new Readable() var writStream = new Writable() //push一些數據 readStream.push('I ') readStream.push('Love ') readStream.push('NodeJs ') //讀取完畢 readStream.push(null) //重寫方法 writStream._write = function(chunk,encode,cb){ console.log(chunk.toString()) cb() } //最後,使用 pipe鏈接起來 readStream.pipe(writStream)
運行結果以下:
來實現一個定製的可讀流,可寫流、轉換流
var stream = require('stream') var util = require('util') //定製的可寫流 function ReadStream(){ //首先改變它的上下文,讓它能夠調用Stream裏面可讀類的方法 stream.Readable.call(this) } //來讓咱們聲明的可讀流繼承流裏面可讀的原型 util.inherits(ReadStream,stream.Readable) //而後就能夠爲可讀流添加原型鏈上的read方法 ReadStream.prototype._read = function(){ //只幹一件事,push數據 this.push('I ') this.push('Love ') this.push('NodeJs ') this.push(null) } //聲明可寫流 function WriteStream(){ stream.Writable.call(this) //聲明cache this._cached = new Buffer('') } util.inherits(WriteStream,stream.Writable) WriteStream.prototype._write = function(chunk,encode,cb){ console.log(chunk.toString()) cb() } //聲明轉換流 function TransformStream(){ stream.Transform.call(this) } util.inherits(TransformStream,stream.Transform) TransformStream.prototype._transform = function(chunk,encode,cb){ this.push(chunk) cb() } //flush TransformStream.prototype._flush = function(cb){ this.push('Oh Year!') cb() } //生成實例 var rs = new ReadStream() var ws = new WriteStream() var ts = new TransformStream() //讀到的數據pipe給轉換流 rs.pipe(ts).pipe(ws)
運行結果以下: