nodejs-流(stream)操做基礎

什麼是流?
  • 流是一種用來處理文件的字節傳輸手段
  • 它不關心文件的總體內容,只關注是否從文件中讀取到了數據,以及讀到數據以後的處理
流的類型?
  • Readable--> 可讀流(例如 fs.createReadStream())
  • Writable--> 可寫流(例如 fs.createWriteStream())
  • Duplex--> 可讀可寫流(雙工流 例如 net.Socket)
  • Transform--> 在讀寫過程當中能夠修改和變化數據的Duplex流(例如 zlib.createDeflate())
Reanable(可讀流)

建立可讀流html

var rs = fs.createReadStream(path,{
    flags: 'r', // 打開文件要作的操做,默認爲‘r’
    encoding: 'utf8', // 指定解析的字符編碼格式,默認爲null; 同理:rs.setEncoding('utf8')
    start: '3', // 開始讀取的索引位置
    end: '9', // 結束讀取的索引位置(包括結束位置)
    highWaterMark: '3', // 從底層資源讀取數據並存儲在內部緩衝區中的最大字節數,默認16k;
                        //注意:若是指定utf8編碼highWaterMark要大於3個字節
});
複製代碼

open事件node

// 打開要讀取的文件後觸發
rs.on('open', () => {
    console.log('打開文件')
})
複製代碼

data事件segmentfault

// 流自動從底層讀取數據
rs.on('data', chunk => {
    console.log(chunk);
    // 調用pause()方法暫停數據的讀取
    rs.pause(); // 此時切換爲暫停模式
})
setTimeout(() => {
    // 恢復數據的讀取,切換回流動模式
    rs.resume();
}, 2000)
複製代碼

error事件緩存

// 讀取數據過程當中出現錯誤觸發
rs.on('error', err => {
    console.log(err);
})
複製代碼

end事件bash

// 數據讀取完畢的時候觸發
rs.on('end', () => {
    console.log('讀取結束');
})
複製代碼

close事件函數

rs.on('close', () => {
    console.log('讀取完畢後關閉文件');
})
複製代碼

readable事件ui

rs.on('readable', () => {
    // 監聽readable事件切換爲暫停模式,調用read()方法讀取流中緩存的數據
    // 當緩存中的數據不夠highWaterMark,從新向底層讀取highWaterMark字節的數據填充緩存區;
    // 緩存區的字節數可能會大於highWaterMark
    var chunk = rs.read(1)
    console.log(chunk);
})
複製代碼

可讀流分爲兩種模式:this

  • flowing(流動模式): 當可讀流監聽"data"事件的時候,當前流爲流動模式;可讀流自動從系統底層讀取數據,並經過EventEmitter發送事件來將數據提供給應用。編碼

  • paused(暫停模式): 當可讀流監聽"readable"事件的時候,當前流爲暫停模式;必需要調用stream.read()方法從流中讀取數據片斷。spa

readable._readableState.flowing 字段的值來標識當前可讀流是什麼模式

  • null:值爲null狀況下,可讀流將不會產生數據,由於數據不會被消費;在當前狀態下,監聽流的"data"事件便可變爲流動模式
  • true:值爲true的狀況下,可讀流爲流動模式,流會自動讀取數據返回
  • false:值爲false的狀況下,可讀流爲暫停模式,只有調用stream.read()方法才能夠從流中讀取數據
Writable(可寫流)

建立可寫流

var ws = fs.createWriteStream(path, {
    flags: 'w', // 打開文件要作的操做,默認是‘w’
    encoding: 'utf8', // 指定寫入的字符編碼格式
    highWaterMark: '3', // 緩存區大小(默認爲16kb), 
});
複製代碼

write方法

* chunk 要寫入的數據,類型爲 buffer/string
* encoding 可選,chunk爲字符串時,指定字符編碼
* callback 寫入完畢後的回調
var flag = ws.write(chunk, encoding, callback);
// flag 爲布爾值,緩存區滿時爲false,不然爲true
複製代碼

end方法

ws.end(chunk, encoding, callback);
// 結束寫入的方法,在結束的時候還能夠寫一部分數據進去,
// callback 若是傳入,它將做爲finish事件的回調函數
複製代碼

drain事件

var flag = ws.write(chunk, encoding, callback);
// 當flag爲false時,表示緩存區已滿;當緩存區數據用完,緩存區清空的時候會觸發drain事件
// 必須是在緩存區滿了清空後纔會觸發drain事件
ws.on('drain', () => {
    console.log('緩存區已清空')
})
複製代碼

finish事件

ws.end('結束');
ws.on('finish', () => {
    console.log('全部寫入完成');
})
// 在調用了 stream.end() 方法,且緩衝區數據都已經傳給底層系統以後, 'finish' 事件將被觸發。
複製代碼
pipe

Readable和Writable分別實現了對文件的讀和寫的操做;可是一般狀況下會出現邊讀邊寫的場景,讀取一個文件的內容,寫入到另外一個文件中;在這種場景下,可能會出現讀寫不均衡的問題,寫入比較慢,讀取比較快(來不及寫入的文件數據可能會丟失);因此咱們期待能夠達到讀寫均衡的狀態,因而出現了pipe(導流)。 它能夠控制讀取的速率,當寫入較慢的時候暫停對文件的讀取;當可寫流緩存區數據寫入完畢後恢復文件的讀取。

pipe的用法

readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
// pipe能夠綁定多個可寫流
var to2 = fs.createWriteStream('./3.txt');
from.pipe(to2);
// 分離from綁定的可寫流;
// 不傳參數的話會分離全部綁定的可寫流
from.unpipe(to)
複製代碼

pipe原理

// 當可寫流調用write()方法返回false時,表示緩存區已滿,這時將可讀流切換爲暫停模式;
// 暫停讀取數據,同時監聽可寫流的drain事件,當緩存區數據寫入完畢,觸發drain事件;
// 在drain事件的回調函數中切換可讀流爲流動模式繼續讀取數據
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', data => {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', () => {
    rs.resume();
});
rs.on('end', () => {
    ws.end();
});
複製代碼
Duplex(雙工流)

Duplex 流是同時實現了 Readable 和 Writable 接口的流 其中Readable和Writable分別是兩個不相關的流

// 實現一個簡單自定義的duplex須要定義好兩個方法read和write

let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
    read(){
        console.log(index)
        if(index++<3){
            this.push('b');
        } else {
            this.push(null); 
        }
    },
    write(chunk,encoding,cb){
        var a = chunk.toString().toUpperCase()
        console.log(1);
        cb();
    }
});
//process.stdin 標準輸入流
//proces.stdout標準輸出流
process.stdin.pipe(s)
s.pipe(process.stdout)
process.stdin.pipe(s).pipe(process.stdout);
複製代碼
Transform(轉換)

transform流也是一個雙工流,用以處理輸入輸出是因果相關,位於管道中間層的 Transform 是便可讀也可寫的; Transform類最初繼承自stream.Duplex,而且實現了它本身版本的writable._write()和readable._read()方法。 自定義一個transform流必須實現transform() 方法;

let {Transform}  = require('stream');
//轉換流是實現數據轉換的
let t = Transform({
![](https://user-gold-cdn.xitu.io/2018/2/22/161bc32442b55f8c?w=640&h=180&f=png&s=23446)
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
複製代碼

Transform 一樣是雙工流,看起來和 Duplex 重複了,但二者有一個重要的區別:Duplex 雖然同時具有可讀流和可寫流,但二者是相對獨立的;Transform 的一種流的數據會通過必定的處理過程自動進入另一個流。

參考:

深刻理解 Node Stream 內部機制

nodejs中流(stream)的理解

基礎 進階 實戰

相關文章
相關標籤/搜索