理解Node中的Buffer與stream

最近參加公司組織的Node學習小組,每一個人認領不一樣的知識點,並和組內同窗分享。很喜歡這樣的學習形式,除了能夠系統學習外,還能倒逼本身輸出,收穫頗多,把本身準備的筆記分享出來。javascript

  1. 簡介
  2. Bufferhtml

    1. 簡介
    2. 編碼
    3. 內存分配機制
    4. API概覽
  3. streamjava

    1. 簡介
    2. 可讀流
    3. 可寫流
    4. 雙工流
    5. 實現與使用

簡介

image.png
Buffer是數據以二進制形式臨時存放在內存中的物理映射,stream爲搬運數據的傳送帶和加工器,有方向、狀態、緩衝大小。node

好比咱們實現一個將圖片和音頻讀取到內存而後加工爲的視頻程序,相似於將原料運輸到工廠而後加工爲月餅的流程。數據庫

Buffer

簡介

緩衝區api

數據的移動是爲了處理或讀取它,若是 數據到達的速度比進程消耗的速度快,那麼少數 早到達的數據會處於等待區等候被處理
《Node.js 中的緩衝區(Buffer)到底是什麼?》

image.png

咱們讀一個祕鑰文件進入內存,確定是等整個文件讀入內存後再處理,要提早劃分存放的空間。
就像擺渡車同樣,坐滿了20位才發車,乘客有早到有晚到,必須有一個地方等候,這就是緩衝區。安全

Buffer是數據以二進制形式臨時存放在內存中的物理映射。bash

早期js沒有讀取操做二進制的機制,js最初設計是爲了操做html。服務器

Node早期爲了處理圖像、視頻等文件,將字節編碼爲字符串來處理二進制數據,速度慢
ECMAScript 2015發佈 TypedArray,更高效的訪問和處理二進制,用於操做網絡協議、數據庫、圖片和文件 I/O 等一些須要大量二進制數據的場景。網絡

Buffer 對象用於表示固定長度的字節序列。
Buffer 類是 JavaScript 的 Uint8Array 類的子類,且繼承時帶上了涵蓋額外用例的方法。 只要支持 Buffer 的地方,Node.js API 均可以接受普通的 Uint8Array
-- 官方文檔
因爲歷史緣由,早期的JavaScript語言沒有用於讀取或操做二進制數據流的機制。由於JavaScript最初被設計用於處理HTML文檔,而文檔主要由字符串組成。
-- 《Node.js 企業級應用開發實踐》
總結起來一句話 Node.js 能夠 用來處理二進制流數據或者與之進行交互
-- 《Node.js 中的緩衝區(Buffer)到底是什麼?》

編解碼

將原始字符串與目標字符串進行互轉。

編碼:將消息轉換爲適合 傳輸的字節流
解碼:將傳輸的字節流轉換爲> 程序可用的消息格式 --《Node.js企業級應用開發實戰》>

Buffer與String傳輸對比

const http = require('http');
let s = '';
for (let i=0; i<1024*10; i++) {
    s+='a'
}

const str = s;
const bufStr = Buffer.from(s);
const server = http.createServer((req, res) => {
    console.log(req.url);

    if (req.url === '/buffer') {
        res.end(bufStr);
    } else if (req.url === '/string') {
        res.end(str);
    }
});

server.listen(3000);
# -c 200併發數  -t 等待響應最大時間 秒
$ ab -c 200 -t 60 http://localhost:3000/buffer
$ ab -c 200 -t 60 http://localhost:3000/string

image.png
相同的測試參數,Buffer完成請求13998次,string完成請求9237次,相差4761次,Buffer比字符串的的傳輸更快

支持格式

Buffer 和字符串之間轉換時,默認使用UTF-8,也能夠指定其餘字符編碼格式。
image.png

注意事項:

  1. Buffer => utf8:如遇到非UTF-8數據會轉換爲 �。
  2. Buffer => utf16le:每一個字符會使用2或4個字節進行編碼。
  3. Buffer => latin1:指定了Unicode編碼範圍,超出會截斷並映射爲範圍內的字符串。
Tip:buffer不支持的編碼類型,gbk、gb2312等能夠藉助js工具包iconv-lite實現。
--《深刻淺出Node.js》

內存分配機制

因爲 Buffer 須要處理的是大量的二進制數據,假如用一點就向系統去申請,則會形成頻繁的向系統申請內存調用,因此 Buffer 所佔用的內存不是由 V8 分配,而是在 Node.js 的 C++ 層面完成申請,在 JavaScript 中進行內存分配。這部份內存稱之爲堆外內存

Node.js 採用了 slab 預先申請、過後分配機制

  1. new Buffer1 => 建立slab對象1 => new Buffer2 => 判斷slab對象1剩餘空間是否夠用。
  2. 釋放Buffer一、Buffer2對象時,依然保留slab1對象。
  3. new Buffer3 => 將slab1對象空間劃分給 Buffer3。

slab對象的三種狀態:
image.png

小對象建立

Node覺得 8kb 區分大對象與小對象。當建立的小對象時,分配一個slab對象。
再建立一個小對象時,會判斷當前的slab對象剩餘空間是足夠,若是夠用則使用剩餘空間,若是不夠用則分配新的slab空間

const Buffer1 = new Buffer(1024)

image.png
const Buffer2 = new Buffer(4000)
image.png

slab分配

slab是Linux操做系統的一種內存分配機制。其工做是針對一些常常分配並釋放的對象,這些對象的大小通常比較小
若是直接採用夥伴系統來進行分配和釋放,不只會形成大量的內存碎片,並且處理速度也太慢

而slab分配器是基於對象進行管理的,相同類型的對象歸爲一類,每當要申請這樣一個對象,slab分配器就從一個slab列表中分配一個這樣大小的單元出去,而當要釋放時,將其從新保存在該列表中,而不是直接返回給夥伴系統,從而避免這些內碎片

slab分配器並不丟棄已分配的對象,而是釋放並把它們保存在內存中。當之後又要請求新的對象時,就能夠從內存直接獲取而不用重複初始化
--百度百科 slab

白話:一些小對象常常須要高頻次分配、釋放 ,致使了 內存碎片和處理速度慢,slab機制是:不丟棄釋放的slab對象,將舊slab對象直接分配給新buffer(舊slab對象可能包含舊數據),以此提升性能

老版本new Buffer、與新版本Buffer.allocUnsafe運行更快,可是內存未初始化,可能致使敏感數據泄露:
image.png
手動填充解決:
image.png
使用 --zero-fill-buffers 命令行選項解決:
image.png
Buffer.alloc 較慢,但更可靠:
image.png

API概覽

image.png

簡單使用:

// 指定長度初始化
Buffer.alloc(10)
// 指定填充 1
Buffer.alloc(10, 1)

// 未初始化的緩衝區 比alloc更快,有可能包含舊數據
Buffer.allocUnsafe(10)

//from建立緩衝區
Buffer.from([1,2,3])
Buffer.from('test')
Buffer.from('test','test2')

//相似數據組 能夠用 for..of
const buf = Buffer.from([1,2,3])
for(const item of buf){
    console.log(item)
}

// 輸出
// 1
// 2
// 3

Node 6~8 版本使用new Buffer建立:

// 建立實例
const buf1 = new Buffer()
const buf2 = new Buffer(10)
// 手動覆蓋
buf1.fill(0)

slice/concat/compare:

// 1. 切分
const buf = new Buffer.from('buffer')
console.log(buf.slice(0, 4).toString())
// buff


// 2. 鏈接
const buf = new Buffer.from('buffer')
const buf1 = new Buffer.from('11111')
const buf2 = new Buffer.from('22222')

const concatBuf = Buffer.concat([buf, buf1, buf2], buf.length + buf1.length + buf2.length)
console.log(concatBuf.toString())
// buffer1111122222


// 3. 比較
const buf1 = new Buffer.from('1234')
const buf2 = new Buffer.from('0123')
const arr = [buf1, buf2]
arr.sort(Buffer.compare)
console.log(arr.toString())
// 0123,1234

const buf3 = new Buffer.from('4567')
console.log(buf1.compare(buf1))
console.log(buf1.compare(buf2))
console.log(buf1.compare(buf3))
// 0 相同
// 1 以前
// -1 以後

stream

簡介

流(stream)是 Node.js 中處理流式數據的抽象接口。 stream 模塊用於構建實現了流接口的對象。
Node.js 提供了多種流對象。 例如, HTTP 服務器的請求process.stdout 都是流的實例。
流能夠是可讀的、可寫的、或者可讀可寫的
-- 官方文檔
什麼是 Stream?
流,英文 Stream 是對輸入輸出設備的抽象,這裏的設備能夠是文件、網絡、內存等。
流是有方向性的,當程序從某個數據源讀入數據,會開啓一個輸入流,這裏的數據源能夠是文件或者網絡等,例如咱們從 a.txt 文件讀入數據。相反的當咱們的程序須要寫出數據到指定數據源(文件、網絡等)時,則開啓一個輸出流。當有一些大文件操做時, 咱們就須要 Stream 像管道同樣,一點一點的將數據流出。
--《Node.js 中的緩衝區(Buffer)到底是什麼?》

流是輸入輸出設備的抽象,數據從設備流入內存爲可讀流,從內存流入設備爲可寫,就向水流管道同樣,有方向,也有狀態(流動、暫停)。

stream 模塊主要用於建立新類型的流實例。 對於以消費流對象爲主的開發者,極少須要直接使用 stream 模塊

stream有4種類型,全部流都是EventEmitter對象

  • 可讀流:Writable
  • 可寫流:Readabale
  • 雙工流(可讀可寫):Duplex
  • 轉換流:Transform

簡單用法:

const { Writable } = require('stream');
const fs = require('fs');
// 可讀流實例
const rr = fs.createReadStream('foo.txt');
// 可寫流實例
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

// EventEmitter用法
myWritable.on('pipe',function(){
    // do some thing
})

myWritable.on('finish',function(){
    // do some thing
})

// 可讀流推送到可寫流
myWritable.pipe(rr)

對象模式

Node.js 建立的流都是運做在字符串和 Buffer(或 Uint8Array)上。 固然,流的實現也可使用其它類型的 JavaScript 值(除了 null)。 這些流會以「對象模式」進行操做。
當建立流時,可使用 objectMode 選項把流實例切換到對象模式。 將已存在的流切換到對象模式是不安全的。
-- Node.js v14.16.0

緩衝

highWaterMark選項指定了可緩衝數據大小,即字節總數,對象模式的流爲對象總數。

可讀流緩衝到達highWaterMark指定的值時,會中止從底層資源讀取數據,直到數據被消費。
可寫流緩衝到達highWaterMark值時writable.write()返回false。

stream.pipe()會限制緩衝,避免讀寫不一致致使內存崩潰

可讀流

2種模式

  • 暫停Paused模式
  • 流動Flowing模式

這兩種模式是基於readable.readableFlowing的3種內部狀態的一種簡化抽象。

  • readable.readableFlowing = null 沒有提供消費流數據的機制,此時指定data、指定pipe、執行resume 會使值變爲true
  • readable.readableFlowing = true 調用pause、unpipe會使值變爲false
  • readable.readableFlowing = false

暫停模式對應null 和false。

選擇一種接口風格

Node提供了多種方法來消費流數據。 開發者一般應該選擇其中一種方法來消費數據,不要在單個流使用多種方法來消費數據。 混合使用 on('data')、 on('readable')、 pipe() 或異步迭代器,會致使不明確的行爲。

const fs = require('fs');
const rr = fs.createReadStream('api.xmind');
const file = fs.createWriteStream('api.xmind.file');

// 1. 可讀流綁定可寫流
rr.pipe(file)
rr.unpipe(file)


// 2. data end
rr.on('data', (chunk) => {
  file.write(chunk)
});
rr.on('end', () => {
  file.end()
});


// 3. readable read
rr.on('readable', () => {
  const chunk = rr.read()
  if(null !== chunk){
      file.write(chunk)
  }else{
      file.end()
  }
  // 結束時 read()返回null
});

image.png

可寫流

image.png

例子:

const Writable = require('stream').Writable

const writable = Writable()
// 實現`_write`方法
// 這是將數據寫入底層的邏輯
writable._write = function (data, enc, next) {
  // 將流中的數據寫入底層
  process.stdout.write(data.toString().toUpperCase())
  // 寫入完成時,調用`next()`方法通知流傳入下一個數據
  process.nextTick(next)
}

// 全部數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'))

// 將一個數據寫入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')

// 再無數據寫入流時,須要調用`end`方法
writable.end()

// 輸出
// A
// B
// C
// DONE%

cork/uncork方法

writable.cork() 方法強制把全部寫入的數據都緩衝到內存中。 當調用 stream.uncork() 或 stream.end() 方法時,緩衝的數據纔會被輸出。

stream.cork();
stream.write('一些 ');
stream.write('數據 ');
process.nextTick(() => stream.uncork());

若是一個流上屢次調用 writable.cork(),則必須調用一樣次數的 writable.uncork() 才能輸出緩衝的數據。

stream.cork();
stream.write('一些 ');
stream.cork();
stream.write('數據 ');
process.nextTick(() => {
  stream.uncork();
  // 數據不會被輸出,直到第二次調用 uncork()。
  stream.uncork();
});

雙工流

雙工流(Duplex)是同時實現了可讀、可寫的流,包括TCP socket、zlib、crypto。
轉換流(Transform)是雙工流的一種,例zlib、crypto。

區別:Duplex 雖然同時具有可讀流和可寫流,但二者是獨立的;Transform 的可讀流的數據會通過必定的處理過程自動進入可寫流

例子,實現_read、_write方法,將寫入數據轉爲一、2 :

var Duplex = require('stream').Duplex
var duplex = Duplex()

// 可讀端底層讀取邏輯
duplex._read = function () {
  this._readNum = this._readNum || 0
  if (this._readNum > 1) {
    this.push(null)
  } else {
    this.push('' + (this._readNum++))
  }
}

// 可寫端底層寫邏輯
duplex._write = function (buf, enc, next) {
  // a, b
  process.stdout.write('_write ' + buf.toString() + '\n')
  next()
}

// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))
duplex.write('a')
duplex.write('b')
duplex.end()

// 輸出
// _write a
// _write b
// ondata 0
// ondata 1

轉換流是一種特殊雙工流,對輸入計算後再輸入,如加解密、zlib流、crypto流。輸入、輸入的數據流大小、數據塊數量不必定一致。若是可讀端的數據沒有被消費,可寫流的數據可能會被暫停。

例子,經過transform方法實現大小寫轉換:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

upperCaseTr.on('data', data => process.stdout.write(data))
upperCaseTr.write('hello, ')
upperCaseTr.write('world!')
upperCaseTr.end()

// 輸出 HELLO, WORLD!%

內置轉換流

// 使用pipe 建立.gz壓縮文件
const fs = require('fs');
const zlib = require('zlib');
const fileName = 'api.xmind'
fs.createReadStream(fileName)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(fileName + '.gz'));
// 使用pipe + transform + on 實現進度打印
const fs = require('fs');
const zlib = require('zlib');
const fileName = 'api.xmind'
const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(fileName)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(fileName + '.zz'))
  .on('finish', () => console.log('Done'));

// 輸出
// ........Done
// 使用pipeline方法 實現管道
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
const fileName = 'api'
// 使用 pipeline API 輕鬆地將一系列的流經過管道一塊兒傳送,並在管道徹底地完成時得到通知。

// 使用 pipeline 能夠有效地壓縮一個可能很大的 tar 文件:

pipeline(
  fs.createReadStream(fileName + '.xmind'),
  zlib.createGzip(),
  fs.createWriteStream( fileName + '.tar.gz'),
  (err) => {
    if (err) {
      console.error('管道傳送失敗', err);
    } else {
      console.log('管道傳送成功');
    }
  }
);

// 輸出 
// 管道傳送成功

實現與使用

實現

  1. 若是實現一個新的流,應繼承了四個基本流類之一(stream.Writeable、 stream.Readable、 stream.Duplex 或 stream.Transform),並確保調用了相應的父類構造函數:

    // 1. 繼承
    const { Readable } = require('stream');
    class Counter extends Readable {
      constructor(opt) {
     // do some thing
      }
      _read() {
     // do some thing
      }
    }
  2. 新的流類必須實現一個或多個特定的方法,具體取決於要建立的流的類型,以下圖所示:

    用例 須要實現的方法
    只讀 Readable _read()
    只寫 Writable _write()_writev()_final()
    可讀可寫 Duplex _read()_write()_writev()_final()
    對寫入的數據進行操做,而後讀取結果 Transform _transform()_flush()_final()

避免重寫諸如 write()、 end()、 cork()、 uncork()、 read() 和 destroy() 之類的公共方法,或經過 .emit() 觸發諸如 'error'、 'data'、 'end'、 'finish' 和 'close' 之類的內部事件。 這樣作會破壞當前和將來的流的不變量,從而致使與其餘流、流的實用工具、以及用戶指望的行爲和/或兼容性問題。

使用

// 1. 使用自定義構造函數
const { Readable } = require('stream');
class Counter extends Readable {
  constructor(opt) {
    // do some thing
  }
  _read() {
    // do some thing
  }
}
const myReadable = new Counter()

// 2. 使用原生構造函數
const { Readable } = require('stream');
const myReadable = new Readable({
  read(size) {
    // do some thing
  }
});

// 3. 重寫實例方法
const { Readable } = require('stream');
const myReadable = Readable()
myReadable._write = function (buf, enc, next) {
  // do some thing
}

回顧

  1. Buffer與stream的類比。
  2. Buffer爲數據緩衝區,Buffer類主要處理二進制。
  3. Buffer比String更是適合傳輸。
  4. slab分配機制:重複使用。
  5. Buffer的API概覽。
  6. stream是I/O數據流的抽象,有方向、狀態、緩衝大小。
  7. 3種流:可讀、可寫、可讀可寫(雙工)。
  8. 雙工流中Duplex與Transform區別:讀寫是否獨立。
  9. stream中Readable、Writable、Duplex、Transform、pipeline的使用。
  10. 經過繼承實現不一樣類型的流
  11. 自定義類、構造函數、實例重寫3種使用方式

東拼西湊的知識點,若有問題懇請斧正,以防誤導他人

參考資料:

相關文章
相關標籤/搜索