當處理大文件讀取、壓縮、歸檔、媒體文件和巨大的日誌文件時,數據都會被讀入內存,內存很快就會被使用完,這將會給程序帶來很大的問題。javascript
若是在進行這些操做的時候,配合一個合適的緩衝區,一次讀取固定的長度,就會使用更少的內存,這就是流式的API。php
fs.createReadStream()
).fs.createWriteStream()
).net.Socket
).zlib.createDeflate()
).Node 的文件系統和網絡操做的核心模塊 fs 和 net 都提供了流接口。使用流來處理 I/O 問題會至關簡單。html
使用Node 核心模塊,實現簡單的靜態服務器:java
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
fs.readFile(__dirname + '/index.html', function(err,data){
if(err){
res.statusCode = 500;
res.end(String(err))
return;
}
res.end(data)
})
})
server.listen(3000)
複製代碼
雖然上述代碼是用來非阻塞的 readFile, 一旦讀取的文件很是大或很是多的文件訪問,將會很快耗完內存,所以須要使用fs.createReadStream 方法進行改進:node
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
// 數據經過流的方式,從html 文件輸出到 http 的請求響應
fs.createReadStream(__dirname + '/index.html').pipe(res);
})
server.listen(3000)
複製代碼
上述代碼提供一個緩衝器來發送到客戶端,若是客戶端鏈接較慢,網絡流將會發送信號暫停I/O資源直到客戶端準備好接受更多數據。web
使用流實現一個簡單的靜態文件服務器:api
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
let filename = req.url
if(filename === '/'){
filename = '/index.html'
}
fs.createReadStream(__dirname + filename ).pipe(res);
})
server.listen(3000)
複製代碼
使用gzip壓縮的靜態服務器bash
const http = require('http');
const fs = require('fs');
const zlib = require('zlib')
const server = http.createServer(function(req,res){
res.writeHead(200, { 'content-encoding': 'gzip' })
fs.createReadStream(__dirname + '/index.html' )
.pipe(zlib.createGzip())
.pipe(res);
})
server.listen(3000)
複製代碼
stream 繼承自 events, 所以有事件中的 on、emit 方法。服務器
一、事件網絡
二、方法
繼承可讀流的注意事項:
readable._read()
方法去得到底層的數據資源,並僅能由Readable對象內部方法調用,不該該被用戶程序直接調用。在 readable._read()
實現中,只有還有數據可讀取,就應該調用 readable.push(chunk)
方法把數據加入到內部的可讀隊列,由readable.read
方法讀取供應用程序使用。實例:實現一個可讀流
const { Readable } = require('stream');
const util = require('util');
util.inherits(MyReadStream, Readable)
function MyReadStream(arr){
this.source = arr;
Readable.call(this);
}
MyReadStream.prototype._read = function(){
if(this.source.length){
this.push(this.source[0])
this.source.splice(0,1)
}else{
this.push(null)
}
}
let myStream = new MyReadStream(['php','js','java'])
myStream.on('readable',function(){
let output_buf = myStream.read();
console.log(output_buf,'output') // null
})
myStream.on('data',function(res){
console.log(res.toString(),'data')
})
myStream.on('end',function(){
console.log('end')
})
複製代碼
在上述代碼中,在 readable
事件中調用 read
方法,來讀取一段字符串,並監聽 data
事件來輸出讀取的數據。
Writable 流接口是對寫入數據的目標的抽象。
write(chunk,[encoding],[callback]) --- 將數據寫入流。chunk(數據塊)中包含要寫入的數據,encoding指定字符串的編碼,callback指定當數據已經徹底刷新時執行的一個回調函數。若是成功寫入,write()返回true.
end([chunk],[encoding],[callback]) ---與write()相同,它把Writable對象設爲再也不接受數據的狀態,併發送finish事件。
drain -- 在write()調用返回false後,當準備好開始寫更多數據時,發出此事件通知監視器。
finish -- 當end()在Writable對象上調用,因此數據被刷新,並不會有更多的數據被接受時觸發
pipe -- 當pipe()方法在Readable流上調用,已添加此writable爲目的地時發出
unpipe -- 當unpipe()方法被調用,以刪除Writable爲目的地時發出。
繼承可寫流的注意事項:
writable.write()
方法向流中寫入數據,並在數據處理完成後調用 callback
。若是有錯誤發生, callback
不必定以這個錯誤做爲第一個參數並被調用。要確保可靠地檢測到寫入錯誤,應該監聽 'error'
事件。writable._write()
方法將數據發送到底層資源。實例:實現一個標準輸入到標準輸出的可寫流,並判斷若是輸入的字符包含a, 則報錯並退出
const { Writable } = require('stream');
const util = require('util');
util.inherits(MyWriteStream, Writable)
function MyWriteStream(options){
Writable.call(this, options);
}
MyWriteStream.prototype._write = function(chunk, encoding, callback){
if(chunk.toString().indexOf('a') > -1){
process.stdout.write("新寫入的:"+ chunk)
callback(null)
}else{
callback(new Error('no a'))
}
}
let myStream = new MyWriteStream();
myStream.write('abc\n')
process.stdin.pipe(myStream)複製代碼
注意:必須調用callback
方法來表示寫入成功或失敗。若是出現錯誤,callback
第一個參數必須是Error
對象,成功時參數爲null
。
繼承 stream.Duplex
便可實現一個雙工流
示例:實現一個改變標準輸入內容的顏色,再從標準輸出打印出來
const { Duplex } = require('stream');
const util = require('util');
util.inherits(MyDuplexStream, Duplex)
function MyDuplexStream(options){
Duplex.call(this, options);
this.wating = false;
}
MyDuplexStream.prototype._write = function(chunk, encoding, callback){
this.wating = false;
// 把數據推進到內部隊列
this.push('\u001b[32m' + chunk + '\u001b[39m');
callback()
}
MyDuplexStream.prototype._read = function(chunk, encoding, callback){
if(!this.wating){
// 在等待數據時展現一個提示
this.push('等待輸入> ')
this.wating = true;
}
}
let myStream = new MyDuplexStream();
// 獲取標準輸入,用管道傳給雙工流,單後返回給標準輸出
process.stdin.pipe(myStream).pipe(process.stdout)
複製代碼
轉換流很像雙工流,也實現了 Readable 和 Writable 的接口。不一樣的是,轉換流是轉換數據,仍是用 _transform 實現的。這個方法有三個參數,thunk數據塊、encoding編碼、callback回調(很像_write), 當數據轉換完成後執行回調,容許轉換流異步解析數據。
示例待補。