Stream
是Node.js
中處理文件、HTTP請求等功能的重要依賴,接下來咱們就瞭解一下什麼是Stream
?以及Stream
的常見用法。緩存
開始
Stream
以前,咱們先了解一下流的相關概念,將有助於咱們理解Stream
。在服務端處理文件以及HTTP請求時,咱們常常會用流的方法,來實現文件的讀寫,數據的傳輸和處理。bash
0.1 流數據 來自wiki的定義 Wiki函數
流數據是由不一樣來源連續生成的數據。
流是一組有序的,有起點和終點的字節數據傳輸手段。
這些數據應該使用流處理技術逐步處理,而無需訪問全部數據。
另外,應該考慮到在數據中可能發生概念漂移,這意味着流的屬性可能隨時間而改變。
複製代碼
**0.2 數據流 **ui
數據流是一系列數字編碼的相干信號(數據包或數據包),用於傳輸或接收正在傳輸過程當中的信息。
通常咱們將數據轉換爲二進制或者其它base格式進行傳輸。
複製代碼
流是用於處理Node.js中的流數據的抽象接口。stream
模塊提供了不少API可供實現stream的接口。Node.js中提供了不少stream的實例,如HTTP server
和process.stdout
。在它們中能充分體驗到stream的應用。編碼
1.1 引用stream模塊:spa
const stream = require('stream');
操作系統
Node.js中有4中基本流的類型:code
Readable
:能夠讀的流fs.createReadStream()
Writable
: 能夠寫的流fs.createWriteStream()
Duplex
: 既可讀,又能夠寫net.Socket
Transform
: 在寫入和讀取數據時能夠修改或轉換數據的雙工數據流,zlib.createDeflate()
Buffer
Writable 和 Readable 流都會將數據存儲到內部的緩衝器(buffer)中。
這些緩衝器能夠 經過相應的 writable._writableState.getBuffer() 或
readable._readableState.buffer 來獲取。
當內部可讀緩衝器的大小達到highWaterMark指定的閾值時,
流會暫停從底層資源讀取數據,直到當前緩衝器的數據被消費 (也就是說,
流會在內部中止調用 readable._read()來填充可讀緩衝器)。
可寫流經過反覆調用 writable.write(chunk)
方法將數據放到緩衝器。當內部可寫緩衝器的總大小小於highWaterMark指定的閾
值時, 調用 writable.write() 將返回true。一旦內部緩衝器的大小達到或超過
highWaterMark ,調用writable.write() 將返回 false 。
複製代碼
flowing
和 paused
可讀流,實現了stream.Readable接口的對象,將對象數據讀取爲流數據,當監聽data事件後,開始發射數據orm
var rs = fs.createReadStream(path,[options]);
複製代碼
rs.on('data', function (data) {
console.log(data);
});
複製代碼
rs.on('end', function () {
console.log('讀取完成');
});
複製代碼
rs.on('error', function (err) {
console.log(err);
});
複製代碼
rs.on('open', function () {
console.log(err);
});
複製代碼
rs.on('close', function () {
console.log(err);
});
複製代碼
rs.setEncoding('utf8');
複製代碼
rs.on('data', function (data) {
rs.pause();
console.log(data);
});
setTimeout(function () {
rs.resume();
},2000);
複製代碼
實現了stream.Writable接口的對象來將流數據寫入到對象中server
fs.createWriteStream = function(path, options) {
return new WriteStream(path, options);
};
util.inherits(WriteStream, Writable);
複製代碼
var ws = fs.createWriteStream(path,[options]);
path寫入的文件路徑
options
flags打開文件要作的操做,默認爲'w'
encoding默認爲utf8
highWaterMark寫入緩存區的默認大小16kb
複製代碼
ws.write(chunk,[encoding],[callback]);
chunk寫入的數據buffer/string encoding編碼格式chunk爲字符串時有用,可選 callback 寫入成功後的回調 返回值爲布爾值,系統緩存區滿時爲false,未滿時爲true
ws.end(chunk,[encoding],[callback]);
複製代碼
代表接下來沒有數據要被寫入 Writable 經過傳入可選的 chunk 和 encoding 參數,能夠在關閉流以前再寫入一段數據 若是傳入了可選的 callback 函數,它將做爲 'finish' 事件的回調函數
當一個流不處在 drain 的狀態, 對 write() 的調用會緩存數據塊, 而且返回 false。 一旦全部當前全部緩存的數據塊都排空了(被操做系統接受來進行輸出), 那麼 'drain' 事件就會被觸發
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
let i = 10;
function write(){
let flag = true;
while(i&&flag){
flag = ws.write("1");
i--;
console.log(flag);
}
}
write();
ws.on('drain',()=>{
console.log("drain");
write();
});
複製代碼
在調用了 stream.end() 方法,且緩衝區數據都已經傳給底層系統以後, 'finish' 事件將被觸發。
var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
writer.write(`hello, ${i}!\n`);
}
writer.end('結束\n');
writer.on('finish', () => {
console.error('全部的寫入已經完成!');
});
複製代碼
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag){
rs.pause();
}
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
複製代碼
readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
複製代碼
將數據的滯留量限制到一個可接受的水平,以使得不一樣速度的來源和目標不會淹沒可用內存。