學習本無底,前進莫徬徨node
今天跟你們分享的是node.js中的流(stream
)。它的做用你們應該都在日常使用node的時候看到過,好比:express
gulp
中的pipe就是流的一種方法,經過可寫流和可讀流的配合,達到不佔用多餘緩存的一種讀寫方式。可寫流
,req是可讀流
,他們都是經過封裝node中的net模塊的socket(雙工流
,便可寫、可讀流)而來的。可能不少時候你們都知道怎麼用,但不瞭解它的原理,很尷尬,就像這樣gulp
Readable
(可讀流)、Writable
(可寫流)、Duplex
(雙工流)、Transform
(轉換流)二進制模式
:每一個分塊都是buffer、string對象。對象模式
:流內部處理的是一系列普通對象。可讀流分爲
flowing
和paused
兩種模式windows
path
:讀取的文件的路徑option
:
highWaterMark
:水位線,一次可讀的字節,通常默認是64k
flags
:標識,打開文件要作的操做,默認是r
encoding
:編碼,默認爲bufferstart
:開始讀取的索引位置end
:結束讀取的索引位置(包括結束位置)autoClose
:讀取完畢是否關閉,默認爲truelet ReadStream = require('./ReadStream')
//讀取的時候默認讀64k
let rs = new ReadStream('./a.txt',{
highWaterMark: 2,//一次讀的字節 默認64k
flags: 'r', //標示 r爲讀 w爲寫
autoClose: true, //默認讀取完畢後自動關閉
start: 0,
end: 5, //流是閉合區間包start,也包end 默認是讀完
encoding: 'utf8' //默認編碼是buffer
})
複製代碼
data
:切換到流動模式,能夠流出數據rs.on('data', function (data) {
console.log(data);
});
複製代碼
open
:流打開文件的時候會觸發此監聽rs.on('open', function () {
console.log('文件被打開');
});
複製代碼
error
:流出錯的時候,監聽錯誤信息rs.on('error', function (err) {
console.log(err);
});
複製代碼
end
:流讀取完成,觸發endrs.on('end', function (err) {
console.log('讀取完成');
});
複製代碼
close
:關閉流,觸發rs.on('close', function (err) {
console.log('關閉');
});
複製代碼
pause
:暫停流(改變流的flowing,不讀取數據了);resume
:恢復流(改變流的flowing,繼續讀取數據)//流經過一次後,中止流動,過了2s後再動
rs.on('data', function (data) {
rs.pause();
console.log(data);
});
setTimeout(function () {
rs.resume();
},2000);
複製代碼
fs.read()
:可讀流底層調用的就是這個方法,最原生的讀方法//fd文件描述符,通常經過fs.open中獲取
//buffer是讀取後的數據放入的緩存目標
//0,從buffer的0位置開始放入
//BUFFER_SIZE,每次放BUFFER_SIZE這麼長的長度
//index,每次從文件的index的位置開始讀
//bytesRead,真實讀到的個數
fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){
})
複製代碼
可愛
的讀流吧!let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
constructor(path,options = {}){
super()
this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start //會隨着讀取的位置改變
this.autoClose = options.autoClose || true
this.end = options.end || null
//默認null就是buffer
this.encoding = options.encoding || null
//參數的問題
this.flowing = null //非流動模式
//建立個buffer用來存儲每次讀出來的數據
this.buffer = Buffer.alloc(this.highWaterMark)
//打開這個文件
this.open()
//此方法默認同步調用 每次設置on監聽事件時都會調用以前全部的newListener事件
this.on('newListener',(type)=>{// 等待着他監聽data事件
if(type === 'data'){
this.flowing = true
//開始讀取 客戶已經監聽的data事件
this.read()
}
})
}
//默認第一次調用read方法時fd還沒獲取 因此不能直接讀
read(){
if(typeof this.fd != 'number'){
//等待着觸發open事件後fd確定拿到了 再去執行read方法
return this.once('open',()=>{this.read()})
}
//每次讀的時候都要判斷一下下次讀幾個 若是沒有end就根據highWaterMark來(讀全部的) 若是有且大於highWaterMark就根據highWaterMark來 若是小於highWaterMark就根據end來
let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,byteRead)=>{
this.pos += byteRead
let b = this.encoding?this.buffer.slice(0,byteRead).toString(this.encoding):this.buffer.slice(0,byteRead)
this.emit('data',b)
//若是讀取到的數量和highWaterMark同樣 說明還得繼續讀
if((byteRead === this.highWaterMark)&&this.flowing){
this.read()
}
if(byteRead < this.highWaterMark){
this.emit('end')
this.destory()
}
})
}
destory(){
if(typeof this.fd != 'number'){
return this.emit('close')
}
//若是文件被打開過 就關閉文件而且觸發close事件
fs.close(this.fd,()=>{
this.emit('close')
})
}
pause(){
this.flowing = false
}
resume(){
this.flowing = true
this.read()
}
open(){
//fd表示的就是當前this.path的這個文件,從3開始(number類型)
fs.open(this.path,this.flags,(err,fd)=>{
//有可能fd這個文件不存在 須要作處理
if(err){
//若是有自動關閉 則幫他銷燬
if(this.autoClose){
//銷燬(關閉文件,觸發關閉文件事件)
this.destory()
}
//若是有錯誤 就會觸發error事件
this.emit('error',err)
return
}
//保存文件描述符
this.fd = fd
//當文件打開成功時觸發open事件
this.emit('open',this.fd)
})
}
}
複製代碼
這個方法是可讀流的一種
暫停模式
,他的模式能夠參考爲讀流是往水杯倒水的人,Readable是喝水的人,他們之間存在着一種聯繫,只要Readable喝掉一點水,讀流就會繼續往裏倒
。數組
流會監聽,若是有人讀過流(喝過水),而且減小,就會再去讀一次(倒點水)
行讀取器(LineReader)
let fs = require('fs')
let read = require('./ReadableStream')
let rs = fs.createReadStream('./a.txt', {
//每次讀7個
highWaterMark: 7
})
//若是讀流第一次所有讀下來而且小於highWaterMark,就會再讀一次(再觸發一次readable事件)
//若是rs.read()不加參數,一次性讀完,會從緩存區再讀一次,爲null
//若是readable每次都恰好讀完(即rs.read()的參數恰好和highWaterMark相等),就會一直觸發readable事件,若是最後不足他想喝的數,他就會先觸發一次null,最後把剩下的喝完
//一開始緩存區爲0的時候也會默認調一次readable事件
rs.on('readable', () => {
let result = rs.read(2)
console.log(result)
})
複製代碼
實戰:行讀取器(日常咱們的文件可能有回車、換行,此時若是要每次想讀一行的數據,就得用到readable
)緩存
let EventEmitter = require('events')
//若是要將內容所有讀出就用on('data'),精確讀取就用on('readable')
class LineReader extends EventEmitter {
constructor(path) {
super()
this.rs = fs.createReadStream(path)
//回車符的十六進制
let RETURN = 0x0d
//換行符的十六進制
let LINE = 0x0a
let arr = []
this.on('newListener', (type) => {
if (type === 'newLine') {
this.rs.on('readable', () => {
let char
//每次讀一個,當讀完的時候會返回null,終止循環
while (char = this.rs.read(1)) {
switch (char[0]) {
case RETURN:
break;
//Mac下只有換行符,windows下是回車符和換行符,須要根據不一樣的轉換。由於我這裏是Mac
case LINE:
//若是是換行符就把數組轉換爲字符串
let r = Buffer.from(arr).toString('utf8')
//把數組清空
arr.length = 0
//觸發newLine事件,把獲得的一行數據輸出
this.emit('newLine', r)
break;
default:
//若是不是換行符,就放入數組中
arr.push(char[0])
}
}
})
}
})
//以上只能取出以前的換行符前的代碼,最後一行的後面沒有換行符,因此須要特殊處理。當讀流讀完須要觸發end事件時
this.rs.on('end', () => {
//取出最後一行數據,轉成字符串
let r = Buffer.from(arr).toString('utf8')
arr.length = 0
this.emit('newLine', r)
})
}
}
let lineReader = new LineReader('./a.txt')
lineReader.on('newLine', function (data) {
console.log(data)
})
複製代碼
那麼Readable究竟是怎樣的存在呢?咱們接下來實現他的源碼,看看內部到底怎麼回事服務器
let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
constructor(path,options = {}){
super()
this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start //會隨着讀取的位置改變
this.autoClose = options.autoClose || true
this.end = options.end || null
//默認null就是buffer
this.encoding = options.encoding || null
//參數的問題
this.reading = false //非流動模式
//建立個buffer用來存儲每次讀出來的數據
this.buffers = []
//緩存區長度
this.len = 0
//是否要觸發readable事件
this.emittedReadable = false
//觸發open獲取文件的fd標識符
this.open()
//此方法默認同步調用 每次設置on監聽事件時都會調用以前全部的newListener事件
this.on('newListener',(type)=>{// 等待着他監聽data事件
if(type === 'readable'){
//開始讀取 客戶已經監聽的data事件
this.read()
}
})
}
//readable真正的源碼中的方法,計算出和n最接近的2的冪次數
computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}
read(n){
//當讀的數量大於水平線,會經過取2的冪次取比他大和最接近的數
if(this.len < n){
this.highWaterMark = this.computeNewHighWaterMark(n)
//從新觸發readbale的callback,因此第一次會觸發null
this.emittedReadable = true
//從新讀新的水位線
this._read()
}
//真正讀取到的
let buffer = null
//說明緩存裏有這麼多,取出來
if(n>0 && n<=this.len){
//定義一個buffer
buffer = Buffer.alloc(n)
let buf
let flag = true
let index = 0
//[buffer<1,2,3,4>,buffer<1,2,3,4>,buffer<1,2,3,4>]
//每次取出緩存前的第一個buffer
while(flag && (buf = this.buffers.shift())){
for(let i=0;i<buf.length;i++){
//把取出的一個buffer中的數據放入新定義的buffer中
buffer[index++] = buf[i]
//當buffer的長度和n(參數)長度同樣時,中止循環
if(index === n){
flag = false
//維護緩存,由於可能緩存中的buffer長度大於n,當取出n的長度時,還會剩下其他的buffer,咱們須要切割buf而且放到緩存數組以前
this.len -= n
let r = buf.slice(i+1)
if(r.length){
this.buffers.unshift(r)
}
break
}
}
}
}
//若是緩存區沒有東西,等會讀完須要觸發readable事件
//這裏會有一種情況,就是若是每次Readable讀取的數量正好等於highWaterMark(流讀取到緩存的長度),就會每次都等於0,每次都觸發Readable事件,就會每次讀,讀到沒有爲止,最後還會觸發一下null
if(this.len === 0){
this.emittedReadable = true
}
if(this.len < this.highWaterMark){
//默認,一開始的時候開始讀取
if(!this.reading){
this.reading = true
//真正多讀取操做
this._read()
}
}
return buffer&&buffer.toString()
}
_read(){
if(typeof this.fd != 'number'){
//等待着觸發open事件後fd確定拿到了 再去執行read方法
return this.once('open',()=>{this._read()})
}
//先讀這麼多buffer
let buffer = Buffer.alloc(this.highWaterMark)
fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{
if(byteRead > 0){
//當第一次讀到數據後,改變reading的狀態,若是觸發read事件,可能還會在觸發第二次_read
this.reading = false
//每次讀到數據增長緩存取得長度
this.len += byteRead
//每次讀取以後,會增長讀取的文件的讀取開始位置
this.pos += byteRead
//將讀到的buffer放入緩存區buffers中
this.buffers.push(buffer.slice(0,byteRead))
//觸發readable
if(this.emittedReadable){
this.emittedReadable = false
//能夠讀取了,默認開始的時候杯子填滿了
this.emit('readable')
}
}else{
//沒讀到就出發end事件
this.emit('end')
}
})
}
destory(){
if(typeof this.fd != 'number'){
return this.emit('close')
}
//若是文件被打開過 就關閉文件而且觸發close事件
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
//fd表示的就是當前this.path的這個文件,從3開始(number類型)
fs.open(this.path,this.flags,(err,fd)=>{
//有可能fd這個文件不存在 須要作處理
if(err){
//若是有自動關閉 則幫他銷燬
if(this.autoClose){
//銷燬(關閉文件,觸發關閉文件事件)
this.destory()
}
//若是有錯誤 就會觸發error事件
this.emit('error',err)
return
}
//保存文件描述符
this.fd = fd
//當文件打開成功時觸發open事件
this.emit('open',this.fd)
})
}
}
複製代碼
let rs = fs.createReadStream('./a.txt')
rs.on('data',(data)=>{
console.log(data)
})
//由於上面的data事件把數據讀了,清空緩存區。因此致使下面的readable讀出爲null
rs.on('readable',()=>{
let result = r.read(1)
console.log(result)
})
複製代碼
由於createReadStream
內部調用了ReadStream
類,ReadStream
又實現了Readable
接口,ReadStream
實現了_read()
方法,因此咱們經過自定義一個類繼承stream
模塊的Readable
,並在原型
上自定義一個_read()
就能夠自定義本身的可讀流koa
let { Readable } = require('stream')
class MyRead extends Readable{
//流須要一個_read方法,方法中push什麼,外面就接收什麼
_read(){
//push方法就是上面_read方法中的push同樣,把數據放入緩存區中
this.push('100')
//若是push了null就表示沒有東西可讀了,中止(若是不寫,就會一直push上面的值,死循環)
this.push(null)
}
}
複製代碼
path
:寫入的文件的路徑option
:
highWaterMark
:水位線,一次可寫入緩存中的字節,通常默認是64k
flags
:標識,寫入文件要作的操做,默認是w
encoding
:編碼,默認爲bufferstart
:開始寫入的索引位置end
:結束寫入的索引位置(包括結束位置)autoClose
:寫入完畢是否關閉,默認爲truelet ReadStream = require('./ReadStream')
//讀取的時候默認讀64k
let rs = new ReadStream('./a.txt',{
highWaterMark: 2,//一次讀的字節 默認64k
flags: 'r', //標示 r爲讀 w爲寫
autoClose: true, //默認讀取完畢後自動關閉
start: 0,
end: 5, //流是閉合區間包start,也包end 默認是讀完
encoding: 'utf8' //默認編碼是buffer
})
複製代碼
write
let fs = require('fs')
let ws = fs.createWriteStream('./d.txt',{
flags: 'w',
encoding: 'utf8',
start: 0,
//write的highWaterMark只是用來觸發是否是幹了
highWaterMark: 3 //寫是默認16k
})
//返回boolean 每當write一次都會在ws中吃下一個饅頭 當吃下的饅頭數量達到highWaterMark時 就會返回false 吃不下了會把其他放入緩存 其他狀態返回true
//write只能放string或者buffer
flag = ws.write('1','utf8',()=>{
console.log(i)
})
複製代碼
drain
//drain只有嘴塞滿了 吃完(包括內存中的,就是地下的)纔會觸發 這裏是兩個條件 一個是必須是吃下highWaterMark個饅頭 而且在吃完的時候纔會callback
ws.on('drain',()=>{
console.log('幹了')
})
複製代碼
fs.write()
:可讀流底層調用的就是這個方法,最原生的讀方法//wfd文件描述符,通常經過fs.open中獲取
//buffer,要取數據的緩存源
//0,從buffer的0位置開始取
//BUFFER_SIZE,每次取BUFFER_SIZE這麼長的長度
//index,每次寫入文件的index的位置
//bytesRead,真實寫入的個數
fs.write(wfd,buffer,0,bytesRead,index,function(err,bytesWrite){
})
複製代碼
let fs = require('fs')
let EventEmitter = require('events')
//只有第一次write的時候直接用_write寫入文件 其他都是放到cache中 可是len超過了highWaterMark就會返回false告知須要drain 很佔緩存
//從第一次的_write開始 回去一直經過clearBuffer遞歸_write寫入文件 若是cache中沒有了要寫入的東西 會根據needDrain來判斷是否觸發乾點
class WriteStream extends EventEmitter{
constructor(path,options = {}){
super()
this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start
this.autoClose = options.autoClose || true
this.mode = options.mode || 0o666
//默認null就是buffer
this.encoding = options.encoding || null
//打開這個文件
this.open()
//寫文件的時候須要哪些參數
//第一次寫入的時候 是給highWaterMark個饅頭 他會硬着頭皮寫到文件中 以後纔會把多餘吃不下的放到緩存中
this.writing = false
//緩存數組
this.cache = []
this.callbackList = []
//數組長度
this.len = 0
//是否觸發drain事件
this.needDrain = false
}
clearBuffer(){
//取緩存中最上面的一個
let buffer = this.cache.shift()
if(buffer){
//有buffer的狀況下
this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)
}else{
//沒有的話 先看看需不須要drain
if(this.needDrain){
//觸發drain 並初始化全部狀態
this.writing = false
this.needDrain = false
this.callbackList.shift()()
this.emit('drain')
}
this.callbackList.map(v=>{
v()
})
this.callbackList.length = 0
}
}
_write(chunk,encoding,clearBuffer,callback){
//由於write方法是同步調用的 因此可能還沒獲取到fd
if(typeof this.fd != 'number'){
//直接在open的時間對象上註冊一個一次性事件 當open被emit的時候會被調用
return this.once('open',()=>this._write(chunk,encoding,clearBuffer,callback))
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{
this.pos += byteWrite
//每次寫完 相應減小內存中的數量
this.len -= byteWrite
if(callback) this.callbackList.push(callback)
//第一次寫完
clearBuffer()
})
}
//寫入方法
write(chunk,encoding=this.encoding,callback){
//判斷chunk必須是字符串或者buffer 爲了統一都變成buffer
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding)
//維護緩存的長度 3
this.len += chunk.length
let ret = this.len < this.highWaterMark
if(!ret){
//表示要觸發drain事件
this.needDrain = true
}
//正在寫入的應該放到內存中
if(this.writing){
this.cache.push({
chunk,
encoding,
callback
})
}else{
//這裏是第一次寫的時候
this.writing = true
//專門實現寫的方法
this._write(chunk,encoding,()=>this.clearBuffer(),callback)
}
// console.log(ret)
//能不能繼續寫了 false表明下次寫的時候更佔內存
return ret
}
destory(){
if(typeof this.fd != 'number'){
return this.emit('close')
}
//若是文件被打開過 就關閉文件而且觸發close事件
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
//fd表示的就是當前this.path的這個文件,從3開始(number類型)
fs.open(this.path,this.flags,(err,fd)=>{
//有可能fd這個文件不存在 須要作處理
if(err){
//若是有自動關閉 則幫他銷燬
if(this.autoClose){
//銷燬(關閉文件,出發關閉文件事件)
this.destory()
}
//若是有錯誤 就會觸發error事件
this.emit('error',err)
return
}
//保存文件描述符
this.fd = fd
//當文件打開成功時觸發open事件
this.emit('open',this.fd)
})
}
}
複製代碼
由於createWriteStream
內部調用了WriteStream
類,WriteStream
又實現了Writable
接口,WriteStream
實現了_write()
方法,因此咱們經過自定義一個類繼承stream
模塊的Writable
,並在原型
上自定義一個_write()
就能夠自定義本身的可寫流socket
let { Writable } = require('stream')
class MyWrite extends Writable{
_write(chunk,encoding,callback){
//write()的第一個參數,寫入的數據
console.log(chunk)
//這個callback,就至關於咱們上面的clearBuffer方法,若是不執行callback就不會繼續從緩存中取出寫
callback()
}
}
let write = new MyWrite()
write.write('1','utf8',()=>{
console.log('ok')
})
複製代碼
管道流,是可讀流上的方法,至於爲何放到這裏,主要是由於須要2個流的基礎知識,是可讀流配合可寫流的一種
傳輸方式
。若是用原來的讀寫,由於寫比較耗時
,因此會多讀少寫
,耗內存
,但用了pipe
就不會了,始終用規定
的內存。學習
let fs = require('fs')
//pipe方法叫管道 能夠控制速率
let rs = fs.createReadStream('./d.txt',{
highWaterMark: 4
})
let ws = fs.createWriteStream('./e,txt',{
highWaterMark: 1
})
//會監聽rs的on('data')將讀取到的數據,經過ws.write的方法寫入文件
//調用寫的一個方法 返回boolean類型
//若是返回false就調用rs的pause方法 暫停讀取
//等待可寫流 寫入完畢在監聽drain resume rs
rs.pipe(ws) //會控制速率 防止淹沒可用內存
複製代碼
let fs = require('fs')
//這兩個是上面本身寫的ReadStream和WriteStream
let ReadStream = require('./ReadStream')
let WriteStream = require('./WriteStream')
//若是用原來的讀寫,由於寫比較耗時,因此會多讀少寫,耗內存
ReadStream.prototype.pipe = function(dest){
this.on('data',(data)=>{
let flag = dest.write(data)
//若是寫入的時候嘴巴吃滿了就不繼續讀了,暫停
if(!flag){
this.pause()
}
})
//若是寫的時候嘴巴里的吃完了,就會繼續讀
dest.on('drain',()=>{
this.resume()
})
this.on('end',()=>{
this.destory()
//清空緩存中的數據
fs.fsync(dest.fd,()=>{
dest.destory()
})
})
}
複製代碼
有了雙工流,咱們能夠在同一個對象上
同時實現可讀和可寫
,就好像同時繼承這兩個接口。 重要的是雙工流的可讀性和可寫性操做徹底獨立
於彼此。這僅僅是將兩個特性組合成一個對象。
let { Duplex } = require('stream')
//雙工流,可讀可寫
class MyDuplex extends Duplex{
_read(){
this.push('hello')
this.push(null)
}
_write(chunk,encoding,clearBuffer){
console.log(chunk)
clearBuffer()
}
}
let myDuplex = new MyDuplex()
//process.stdin是node自帶的process進程中的可讀流,會監聽命令行的輸入
//process.stdout是node自帶的process進程中的可寫流,會監聽並輸出在命令行中
//因此這裏的意思就是在命令行先輸出hello,而後咱們輸入什麼他就出來對應的buffer(先做爲可讀流出來)
process.stdin.pipe(myDuplex).pipe(process.stdout)
複製代碼
轉換流的輸出是從輸入中
計算
出來的。對於轉換流,咱們沒必要實現read
或write
的方法,咱們只須要實現一個transform
方法,將二者結合起來。它有write
方法的意思,咱們也能夠用它來push
數據。
let { Transform } = require('stream')
class MyTransform extends Transform{
_transform(chunk,encoding,callback){
console.log(chunk.toString().toUpperCase())
callback()
}
}
let myTransform = new MyTransform()
class MyTransform2 extends Transform{
_transform(chunk,encoding,callback){
console.log(chunk.toString().toUpperCase())
this.push('1')
// this.push(null)
callback()
}
}
let myTransform2 = new MyTransform2()
//此時myTransform2被做爲可寫流觸發_transform,輸出輸入的大寫字符後,會經過可讀流push字符到下一個轉換流中
//當寫入的時候纔會觸發transform的值,此時纔會push,因此後面的pipe拿到的chunk是前面的push的值
process.stdin.pipe(myTransform2).pipe(myTransform)
複製代碼
嘴
真正的吃滿了,而且等到把嘴裏的和地上的饅頭(緩存中的)都吃下了纔會觸發drain
事件隔離
的_transform
方法中。跟雙工流的區別就是,他的可讀可寫是在一塊兒
的。OK,講完收工,今後你就是流
魔王