Node 深刻Stream(2)

1. Node.js 中有四種基本的流類型:

  • Readable - 可讀的流 (例如 fs.createReadStream()).
  • Writable - 可寫的流 (例如 fs.createWriteStream()).
  • Duplex - 可讀寫的流 (例如 net.Socket).
  • Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).

2. 流中的數據有兩種模式,二進制模式和對象模式.

  • 二進制模式, 每一個分塊都是buffer或者string對象.
  • 對象模式, 流內部處理的是一系列普通對象.

    全部使用 Node.js API 建立的流對象都只能操做 strings 和 Buffer對象。可是,經過一些第三方流的實現,你依然可以處理其它類型的 JavaScript 值 (除了 null,它在流處理中有特殊意義)。 這些流被認爲是工做在 「對象模式」(object mode)。 在建立流的實例時,能夠經過 objectMode 選項使流的實例切換到對象模式。試圖將已經存在的流切換到對象模式是不安全的。緩存

3. 可讀流的兩種模式

  • 可讀流事實上工做在下面兩種模式之一:flowingpaused
  • 在 flowing 模式下, 可讀流自動從系統底層讀取數據,並經過 EventEmitter 接口的事件儘快將數據提供給應用。
  • 在 paused 模式下,必須顯式調用 stream.read() 方法來從流中讀取數據片斷。
  • 全部初始工做模式爲 paused 的 Readable 流,能夠經過下面三種途徑切換到 flowing 模式:
    • 監聽 'data' 事件
    • 調用 stream.resume() 方法
    • 調用 stream.pipe() 方法將數據發送到 Writable
  • 可讀流能夠經過下面途徑切換到 paused 模式:
    • 若是不存在管道目標(pipe destination),能夠經過調用 stream.pause() 方法實現。
    • 若是存在管道目標,能夠經過取消 'data' 事件監聽,並調用 stream.unpipe() 方法移除全部管道目標來實現。

若是 Readable 切換到 flowing 模式,且沒有消費者處理流中的數據,這些數據將會丟失。 好比, 調用了 readable.resume() 方法卻沒有監聽 'data' 事件,或是取消了 'data' 事件監聽,就有可能出現這種狀況。安全

4.緩存區

  • Writable 和 Readable 流都會將數據存儲到內部的緩衝器(buffer)中。這些緩衝器能夠 經過相應的 writable._writableState.getBuffer() 或 readable._readableState.buffer 來獲取。bash

  • 緩衝器的大小取決於傳遞給流構造函數的 highWaterMark 選項。 對於普通的流, highWaterMark 選項指定了總共的字節數。對於工做在對象模式的流, highWaterMark 指定了對象的總數。函數

  • 當可讀流的實現調用stream.push(chunk)方法時,數據被放到緩衝器中。若是流的消費者沒有調用stream.read()方法, 這些數據會始終存在於內部隊列中,直到被消費。ui

  • 當內部可讀緩衝器的大小達到 highWaterMark 指定的閾值時,流會暫停從底層資源讀取數據,直到當前 緩衝器的數據被消費 (也就是說, 流會在內部中止調用 readable._read() 來填充可讀緩衝器)。this

  • 可寫流經過反覆調用 writable.write(chunk) 方法將數據放到緩衝器。 當內部可寫緩衝器的總大小小於 highWaterMark 指定的閾值時, 調用 writable.write() 將返回true。 一旦內部緩衝器的大小達到或超過 highWaterMark ,調用 writable.write() 將返回 false 。spa

  • stream API 的關鍵目標, 尤爲對於 stream.pipe() 方法, 就是限制緩衝器數據大小,以達到可接受的程度。這樣,對於讀寫速度不匹配的源頭和目標,就不會超出可用的內存大小。code

  • Duplex 和 Transform 都是可讀寫的。 在內部,它們都維護了 兩個 相互獨立的緩衝器用於讀和寫。 在維持了合理高效的數據流的同時,也使得對於讀和寫能夠獨立進行而互不影響。orm

5. 可讀流的三種狀態

在任意時刻,任意可讀流應確切處於下面三種狀態之一:對象

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

  • 若 readable._readableState.flowing 爲 null,因爲不存在數據消費者,可讀流將不會產生數據。 在這個狀態下,監聽 'data' 事件,調用 readable.pipe() 方法,或者調用 readable.resume() 方法, readable._readableState.flowing 的值將會變爲 true 。這時,隨着數據生成,可讀流開始頻繁觸發事件。

  • 調用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 「背壓」(back pressure), 將致使 readable._readableState.flowing 值變爲 false。 這將暫停事件流,但 不會 暫停數據生成。 在這種狀況下,爲 'data' 事件設置監聽函數不會致使 readable._readableState.flowing 變爲 true。

  • 當 readable._readableState.flowing 值爲 false 時, 數據可能堆積到流的內部緩存中。

6.readable

'readable' 事件將在流中有數據可供讀取時觸發。在某些狀況下,爲 'readable' 事件添加回調將會致使一些數據被讀取到內部緩存中。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 有一些數據可讀了
});
複製代碼
  • 當到達流數據尾部時, 'readable' 事件也會觸發。觸發順序在 'end' 事件以前。
  • 事實上, 'readable' 事件代表流有了新的動態:要麼是有了新的數據,要麼是到了流的尾部。 對於前者, stream.read() 將返回可用的數據。而對於後者, stream.read() 將返回 null。
let fs =require('fs');
let rs = fs.createReadStream('./1.txt',{
  start:3,
  end:8,
  encoding:'utf8',
  highWaterMark:3
});
rs.on('readable',function () {
  console.log('readable');
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  let d = rs.read(1);
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  console.log(d);
  setTimeout(()=>{
      console.log('rs._readableState.buffer.length',rs._readableState.length);
  },500)
});
複製代碼

7.流的經典應用

7.1 行讀取器

7.1.1 換行和回車

  • 之前的打印要每秒能夠打印10個字符,換行城要0.2秒,正要能夠打印2個字符。
  • 研製人員就是在每行後面加兩個表示結束的字符。一個叫作"回車",告訴打字機把打印頭定位在左邊界;另外一個叫作"換行",告訴打字機把紙向下移一行。
  • Unix系統裏,每行結尾只有換行"(line feed)",即"\n",
  • Windows系統裏面,每行結尾是"<回車><換行>",即"\r\n"
  • Mac系統裏,每行結尾是"回車"(carriage return),即"\r"
  • 在ASCII碼裏
    • 換行 \n 10 0A
    • 回車 \r 13 0D

ASCII

7.1.2 代碼

let fs = require('fs');
let EventEmitter = require('events');
let util = require('util');
util.inherits(LineReader, EventEmitter)
fs.readFile('./1.txt',function (err,data) {
    console.log(data);
})
function LineReader(path) {
    EventEmitter.call(this);
    this._rs = fs.createReadStream(path);
    this.RETURN = 0x0D;// \r 13
    this.NEW_LINE = 0x0A;// \n 10
    this.on('newListener', function (type, listener) {
        if (type == 'newLine') {
            let buffer = [];
            this._rs.on('readable', () => {
                let bytes;
                while (null != (bytes = this._rs.read(1))) {
                    let ch = bytes[0];
                    switch (ch) {
                        case this.RETURN:
                            this.emit('newLine', Buffer.from(buffer));
                            buffer.length = 0;
                            let nByte = this._rs.read(1);
                            if (nByte && nByte[0] != this.NEW_LINE) {
                                buffer.push(nByte[0]);
                            }
                            break;
                        case this.NEW_LINE:
                            this.emit('newLine', Buffer.from(buffer));
                            buffer.length = 0;
                            break;
                        default:
                            buffer.push(bytes[0]);
                            break;
                    }
                }
            });
            this._rs.on('end', () => {
                if (buffer.length > 0) {
                    this.emit('newLine', Buffer.from(buffer));
                    buffer.length = 0;
                    this.emit('end');
                }
            })
        }
    });
}

var lineReader = new LineReader('./1.txt');
lineReader.on('newLine', function (data) {
    console.log(data.toString());
}).on('end', function () {
    console.log("end");
})複製代碼
相關文章
相關標籤/搜索