Nodejs 實踐 -- Stream 流

何時使用流

當處理大文件讀取、壓縮、歸檔、媒體文件和巨大的日誌文件時,數據都會被讀入內存,內存很快就會被使用完,這將會給程序帶來很大的問題。javascript

若是在進行這些操做的時候,配合一個合適的緩衝區,一次讀取固定的長度,就會使用更少的內存,這就是流式的API。php

Stream 可用的API 類


1、使用內置的流來實現靜態web服務器

Node 的文件系統和網絡操做的核心模塊 fs 和 net  都提供了流接口。使用流來處理 I/O 問題會至關簡單。html

使用Node 核心模塊,實現簡單的靜態服務器:java

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
	fs.readFile(__dirname + '/index.html', function(err,data){
		if(err){
			res.statusCode = 500;
			res.end(String(err))
			return;
		}
		res.end(data)
	})
})

server.listen(3000)
複製代碼

雖然上述代碼是用來非阻塞的 readFile, 一旦讀取的文件很是大或很是多的文件訪問,將會很快耗完內存,所以須要使用fs.createReadStream 方法進行改進node

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
        // 數據經過流的方式,從html 文件輸出到 http 的請求響應
	fs.createReadStream(__dirname + '/index.html').pipe(res);
})

server.listen(3000)
複製代碼

上述代碼提供一個緩衝器來發送到客戶端,若是客戶端鏈接較慢,網絡流將會發送信號暫停I/O資源直到客戶端準備好接受更多數據。web

使用流實現一個簡單的靜態文件服務器:api

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
	let filename = req.url
	if(filename === '/'){
	   filename = '/index.html'	
	}
	fs.createReadStream(__dirname + filename ).pipe(res);	
})

server.listen(3000)
複製代碼


使用gzip壓縮的靜態服務器bash

const http = require('http');
const fs = require('fs');
const zlib = require('zlib')

const server = http.createServer(function(req,res){
	res.writeHead(200, { 'content-encoding': 'gzip' })
	fs.createReadStream(__dirname + '/index.html' )
            .pipe(zlib.createGzip())
            .pipe(res);	
})

server.listen(3000)
複製代碼



2、Readable  可讀流

stream 繼承自 events, 所以有事件中的 on、emit 方法。服務器

一、事件網絡

  • readable --- 在能夠從流中讀取數據塊的時候發出。
  • data ---  數據正在傳遞時,觸發該事件(以chunk數據塊爲對象)
  • end --- 當數據讀取結束時觸發
  • close --- 當底層資源(如文件) 關閉時觸發。
  • error --- 在接收數據中出錯時觸發。

二、方法

  • read([size]) --- 從流中讀數據.數據能夠是String、Buffer、null(下面代碼會有),當指定size,那麼只讀僅限於那個字節數
  • setEncoding(encoding) --- 設置read()請求讀取返回String時使用的編碼
  • pause() --- 暫停從該對象發出的data事件
  • resume() --- 恢復從該對象發出的data事件
  • pipe(destination,[options]) --- 把讀取的數據塊傳遞給一個 Writable 的目的地。當數據傳送完畢,觸發'end'事件時,會同時觸發目標(可寫流)的'end'事件,致使目標再也不可寫
  • unpipe([destination]) ---- 從Writale目的地斷開這一對象。

繼承可讀流的注意事項:

  • readable.read 方法會返回的數據塊,都是由 readable.push 方法加入到內部可讀隊列中的。
  • 全部繼承可讀流的子類,必須實現readable._read() 方法去得到底層的數據資源,並僅能由Readable對象內部方法調用,不該該被用戶程序直接調用。在 readable._read()實現中,只有還有數據可讀取,就應該調用 readable.push(chunk) 方法把數據加入到內部的可讀隊列,由readable.read方法讀取供應用程序使用。
  • 一旦 實例監聽了 data 事件,則 readable._read() 的返回值將丟失。

實例:實現一個可讀流

const { Readable } = require('stream');
const util = require('util');

util.inherits(MyReadStream, Readable)

function MyReadStream(arr){
	this.source = arr;
	Readable.call(this);
}

MyReadStream.prototype._read = function(){
	if(this.source.length){
		this.push(this.source[0])
		this.source.splice(0,1)
	}else{
		this.push(null)
	}
}

let myStream = new MyReadStream(['php','js','java'])

myStream.on('readable',function(){
	let output_buf  = myStream.read();
	console.log(output_buf,'output')  // null
})

myStream.on('data',function(res){
	console.log(res.toString(),'data')
})
myStream.on('end',function(){
	console.log('end')
})
複製代碼

在上述代碼中,在 readable 事件中調用 read 方法,來讀取一段字符串,並監聽 data 事件來輸出讀取的數據。


3、Writable 可寫流

Writable 流接口是對寫入數據的目標的抽象。

一、方法

write(chunk,[encoding],[callback]) --- 將數據寫入流。chunk(數據塊)中包含要寫入的數據,encoding指定字符串的編碼,callback指定當數據已經徹底刷新時執行的一個回調函數。若是成功寫入,write()返回true.

end([chunk],[encoding],[callback]) ---與write()相同,它把Writable對象設爲再也不接受數據的狀態,併發送finish事件。

二、事件

drain -- 在write()調用返回false後,當準備好開始寫更多數據時,發出此事件通知監視器。

finish -- 當end()在Writable對象上調用,因此數據被刷新,並不會有更多的數據被接受時觸發

pipe -- 當pipe()方法在Readable流上調用,已添加此writable爲目的地時發出

unpipe -- 當unpipe()方法被調用,以刪除Writable爲目的地時發出。


繼承可寫流的注意事項:

  • writable.write() 方法向流中寫入數據,並在數據處理完成後調用 callback 。若是有錯誤發生, callback不必定以這個錯誤做爲第一個參數並被調用。要確保可靠地檢測到寫入錯誤,應該監聽 'error' 事件。
  • 全部可寫流實現必須提供一個 writable._write() 方法將數據發送到底層資源。


實例:實現一個標準輸入到標準輸出的可寫流,並判斷若是輸入的字符包含a, 則報錯並退出

const { Writable } = require('stream');
const util = require('util');

util.inherits(MyWriteStream, Writable)


function MyWriteStream(options){
	Writable.call(this, options);
}

MyWriteStream.prototype._write = function(chunk, encoding, callback){
	if(chunk.toString().indexOf('a') > -1){
		process.stdout.write("新寫入的:"+ chunk)
		callback(null)
	}else{
		callback(new Error('no a'))
	}
}

let myStream = new MyWriteStream();
myStream.write('abc\n')
process.stdin.pipe(myStream)複製代碼

注意:必須調用callback方法來表示寫入成功或失敗。若是出現錯誤,callback第一個參數必須是Error對象,成功時參數爲null


4、雙工流 -- 可讀可寫的流

繼承 stream.Duplex 便可實現一個雙工流

示例:實現一個改變標準輸入內容的顏色,再從標準輸出打印出來

const { Duplex } = require('stream');
const util = require('util');

util.inherits(MyDuplexStream, Duplex)


function MyDuplexStream(options){
	Duplex.call(this, options);
	this.wating = false;
}

MyDuplexStream.prototype._write = function(chunk, encoding, callback){
	this.wating = false;
        // 把數據推進到內部隊列
	this.push('\u001b[32m' +  chunk + '\u001b[39m');
	callback()
}

MyDuplexStream.prototype._read = function(chunk, encoding, callback){
	if(!this.wating){
                // 在等待數據時展現一個提示
		this.push('等待輸入> ')
		this.wating = true;
	}
}

let myStream = new MyDuplexStream();

// 獲取標準輸入,用管道傳給雙工流,單後返回給標準輸出
process.stdin.pipe(myStream).pipe(process.stdout)
複製代碼



5、轉換流 

轉換流很像雙工流,也實現了 Readable 和 Writable 的接口。不一樣的是,轉換流是轉換數據,仍是用 _transform 實現的。這個方法有三個參數,thunk數據塊、encoding編碼、callback回調(很像_write), 當數據轉換完成後執行回調,容許轉換流異步解析數據。

示例待補。

相關文章
相關標籤/搜索