快速認識Node.js中的Stream

StreamNode.js中處理文件、HTTP請求等功能的重要依賴,接下來咱們就瞭解一下什麼是Stream?以及Stream的常見用法。緩存

開始Stream以前,咱們先了解一下流的相關概念,將有助於咱們理解Stream。在服務端處理文件以及HTTP請求時,咱們常常會用流的方法,來實現文件的讀寫,數據的傳輸和處理。bash


0.1 流數據 來自wiki的定義 Wiki函數

流數據是由不一樣來源連續生成的數據。
流是一組有序的,有起點和終點的字節數據傳輸手段。
這些數據應該使用流處理技術逐步處理,而無需訪問全部數據。
另外,應該考慮到在數據中可能發生概念漂移,這意味着流的屬性可能隨時間而改變。
複製代碼

**0.2 數據流 **ui

數據流是一系列數字編碼的相干信號(數據包或數據包),用於傳輸或接收正在傳輸過程當中的信息。
 通常咱們將數據轉換爲二進制或者其它base格式進行傳輸。
複製代碼

1. 流的定義

流是用於處理Node.js中的流數據的抽象接口。stream模塊提供了不少API可供實現stream的接口。Node.js中提供了不少stream的實例,如HTTP serverprocess.stdout。在它們中能充分體驗到stream的應用。編碼

1.1 引用stream模塊:spa

const stream = require('stream');操作系統

2. 流的分類

Node.js中有4中基本流的類型:code

  • 可讀流 Readable:能夠讀的流fs.createReadStream()
  • 可寫流 Writable: 能夠寫的流fs.createWriteStream()
  • 雙工流 Duplex: 既可讀,又能夠寫net.Socket
  • 轉換流 Transform: 在寫入和讀取數據時能夠修改或轉換數據的雙工數據流,zlib.createDeflate()

3. 流中的數據模式

  • 二進制模式,每一個分塊都是buffer或者string對象
  • 對象模式,流內部處理的是一系列普通對象

4. 緩存區Buffer

Writable 和 Readable 流都會將數據存儲到內部的緩衝器(buffer)中。
這些緩衝器能夠 經過相應的 writable._writableState.getBuffer() 或
readable._readableState.buffer 來獲取。

當內部可讀緩衝器的大小達到highWaterMark指定的閾值時,
流會暫停從底層資源讀取數據,直到當前緩衝器的數據被消費 (也就是說,
流會在內部中止調用 readable._read()來填充可讀緩衝器)。

可寫流經過反覆調用 writable.write(chunk)
方法將數據放到緩衝器。當內部可寫緩衝器的總大小小於highWaterMark指定的閾
值時, 調用 writable.write() 將返回true。一旦內部緩衝器的大小達到或超過
highWaterMark ,調用writable.write() 將返回 false複製代碼

5. 可讀流

5.1 可讀流的兩種模式

  • flowingpaused

5.2 可讀流的三種狀態

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

5.3 可讀流的經常使用方法

可讀流,實現了stream.Readable接口的對象,將對象數據讀取爲流數據,當監聽data事件後,開始發射數據orm

5.3.1 建立可讀流
var rs = fs.createReadStream(path,[options]);
複製代碼
  1. 監聽data事件,流切換到流動模式,數據會被儘量快的讀出
rs.on('data', function (data) {
    console.log(data);
});
複製代碼
  1. 監聽end事件,該事件會在讀完數據後被觸發
rs.on('end', function () {
    console.log('讀取完成');
});
複製代碼
  1. 監聽error事件
rs.on('error', function (err) {
    console.log(err);
});
複製代碼
  1. 監聽open事件
rs.on('open', function () {
    console.log(err);
});
複製代碼
  1. 監聽close事件
rs.on('close', function () {
    console.log(err);
});
複製代碼
  1. 設置編碼,與指定{encoding:'utf8'}效果相同,設置編碼
rs.setEncoding('utf8');
複製代碼
  1. 暫停和恢復觸發data,經過pause()方法和resume()方法
rs.on('data', function (data) {
    rs.pause();
    console.log(data);
});
setTimeout(function () {
    rs.resume();
},2000);
複製代碼

6. 可寫流

實現了stream.Writable接口的對象來將流數據寫入到對象中server

fs.createWriteStream = function(path, options) {
  return new WriteStream(path, options);
};

util.inherits(WriteStream, Writable);
複製代碼

6.1 建立可寫流

var ws = fs.createWriteStream(path,[options]);
path寫入的文件路徑
options
flags打開文件要作的操做,默認爲'w'
encoding默認爲utf8
highWaterMark寫入緩存區的默認大小16kb
複製代碼

6.2 write方法

ws.write(chunk,[encoding],[callback]);

chunk寫入的數據buffer/string encoding編碼格式chunk爲字符串時有用,可選 callback 寫入成功後的回調 返回值爲布爾值,系統緩存區滿時爲false,未滿時爲true

6.3 end方法

ws.end(chunk,[encoding],[callback]);
複製代碼

代表接下來沒有數據要被寫入 Writable 經過傳入可選的 chunk 和 encoding 參數,能夠在關閉流以前再寫入一段數據 若是傳入了可選的 callback 函數,它將做爲 'finish' 事件的回調函數

6.4 drain方法

當一個流不處在 drain 的狀態, 對 write() 的調用會緩存數據塊, 而且返回 false。 一旦全部當前全部緩存的數據塊都排空了(被操做系統接受來進行輸出), 那麼 'drain' 事件就會被觸發

let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
  flags:'w',
  encoding:'utf8',
  highWaterMark:3
});
let i = 10;
function write(){
 let  flag = true;
 while(i&&flag){
      flag = ws.write("1");
      i--;
     console.log(flag);
 }
}
write();
ws.on('drain',()=>{
  console.log("drain");
  write();
});
複製代碼

6.4 finish方法

在調用了 stream.end() 方法,且緩衝區數據都已經傳給底層系統以後, 'finish' 事件將被觸發。

var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
  writer.write(`hello, ${i}!\n`);
}
writer.end('結束\n');
writer.on('finish', () => {
  console.error('全部的寫入已經完成!');
});
複製代碼

7 Pipe 方法

7.1 pipe方法的原理

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag){
      rs.pause();
    }
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});
複製代碼
readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
複製代碼

將數據的滯留量限制到一個可接受的水平,以使得不一樣速度的來源和目標不會淹沒可用內存。

相關文章
相關標籤/搜索