面試高級前端工程師必問之流-stream

流(stream)是一種在 Node.js 中處理流式數據的抽象接口。 stream 模塊提供了一些基礎的 API,用於構建實現了流接口的對象。緩存

Node.js 提供了多種流對象。 例如,發送到 HTTP 服務器的請求和 process.stdout 都是流的實例。bash

流能夠是可讀的、可寫的、或是可讀寫的。 全部的流都是 EventEmitter 的實例。服務器

流的類型

Node.js 中有四種基本的流類型(本篇主要說前兩種):異步

  • Writable - 可寫入數據的流(例如 fs.createWriteStream())
  • Readable - 可讀取數據的流(例如 fs.createReadStream())
  • Duplex - 可讀又可寫的流(例如 net.Socket)
  • Transform - 在讀寫過程當中能夠修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())

緩衝

流中一個至關重要的概念,不管讀寫流都是經過緩衝來實現的。 可寫流和可讀流都會在一個內部的緩衝器中存儲數據,能夠分別使用的 writable.writableBuffer 或 readable.readableBuffer 來獲取,可緩衝的數據的數量取決於傳入流構造函數的 highWaterMark 選項,默認狀況下highWaterMark 64*1024個字節 讀寫的過程都是將數據讀取寫入緩衝,而後在將數據讀出或者寫入文件。函數

幾個重要的底層方法

  1. writable.write(chunk[, encoding][, callback]) writable.write() 方法向流中寫入數據,並在數據處理完成後調用 callback 。若是有錯誤發生, callback 不必定 以這個錯誤做爲第一個參數並被調用。要確保可靠地檢測到寫入錯誤,應該監聽 'error' 事件。 在確認了 chunk 後,若是內部緩衝區的大小小於建立流時設定的 highWaterMark 閾值,函數將返回 true 。 若是返回值爲 false ,應該中止向流中寫入數據,直到 'drain' 事件被觸發。 當一個流不處在 drain 的狀態, 對 write() 的調用會緩存數據塊, 而且返回 false。 一旦全部當前全部緩存的數據塊都排空了(被操做系統接受來進行輸出), 那麼 'drain' 事件就會被觸發。
  2. readable.read([size])

來一個小例子,有助於理解學習

// pipe
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:1
})
let ws = fs.createWriteStream('./5.txt',{
    highWaterMark:2
})
let index = 1;
rs.on('data', (data) => {
    console.log(index++)
    let flag = ws.write(data);    // 當內部的可寫緩衝的總大小小於 highWaterMark 設置的閾值時,
    //調用 writable.write() 會返回 true。 一旦內部緩衝的大小達到或超過 highWaterMark 時,則會返回 falseif (!flag) {     //內部緩衝超過highWaterMark
        rs.pause()
    }
})
let wsIndex = 1;
ws.on('drain', () => {
    console.log('ws'+wsIndex++)
    rs.resume()
})
// 1 2 ws1 3 4 ws2 5 6 ws3
複製代碼

幾個重要的事件監聽

前面已經說了全部的流都是 EventEmitter 的實例,那麼就能夠on,能夠emit等等ui

  1. rs.on('data',()) //讀入緩衝
  2. ws.on('drain',()) //寫的緩衝被清空
    上面的例子中 當寫緩衝大於highWaterMark時 咱們就要暫停讀取,等待監聽到drain事件,而後從新啓動rs.resume()讀取

其實啊,在工做中也是不多直接這用到的,咱們能夠直接用pipe rs.pipe(ws)便可 這樣就給一個可讀流寫入到一個可寫流當中this

本身實現的可讀流

let EventEmitter = require('events');   //全部的流都是 EventEmitter 的實例,流繼承EventEmitter
let fs = require('fs');
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.autoClose = options.autoClose || true;
    this.flags = options.flags || 'r';
    this.encoding = options.encoding || null;
    this.start = options.start || 0;
    this.end = options.end || null;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    // 應該有一個讀取文件的位置 可變的(可變的位置)
    this.pos = this.start;
    // 控制當前是不是流動模式
    this.flowing = null;
    // 構建讀取到的內容的buffer
    this.buffer = Buffer.alloc(this.highWaterMark);
    // 當建立可讀流 要將文件打開
    this.open(); // 異步執行
    this.on('newListener', (type) => {
      if(type === 'data'){ // 用戶監聽了data事件,就開始讀取吧
        this.flowing = true;
        this.read();// 開始讀取文件
      }
    });
  }
  read(){
    // 這時候文件尚未打開呢,等待着文件打開後再去讀取
    if(typeof this.fd !== 'number'){
      // 等待着文件打開,再次調用read方法
      return this.once('open',()=>this.read());
    }
    // 開始讀取了
    // 文件可能有10個字符串
    // start 0 end 4
    // 每次讀三個 3
    // 0-2
    // 34
    let howMuchToRead = this.end ? Math.min(this.highWaterMark,this.end - this.pos+1) :this.highWaterMark
    // 文件描述符 讀到哪一個buffer裏 讀取到buffer的哪一個位置
    // 往buffer裏讀取幾個,讀取的位置
    fs.read(this.fd, this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
      if (bytesRead>0){ // 讀到內容了
        this.pos += bytesRead;
        // 保留有用的
        let r = this.buffer.slice(0, bytesRead);
        r = this.encoding ? r.toString(this.encoding) : r;
        // 第一次讀取
        this.emit('data', r);
        if (this.flowing) {
          this.read();
        }
      }else{
        this.end = true;
        this.emit('end');
        this.destroy();
      }
    });
  }
  destroy() { // 判斷文件是否打開 (將文件關閉掉)
    if (typeof this.fd === 'number') {
      fs.close(this.fd, () => {
        this.emit('close');
      });
      return;
    }
    this.emit('close');
  }
  open() { // 打開文件的邏輯
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        this.emit('error', err);
        if (this.autoClose) {
          this.destroy(); // 銷燬 關閉文件(觸發close事件)
        } return;
      }
      this.fd = fd;
      this.emit('open'); // 觸發文件開啓事件
    });
  }
  pause(){
    this.flowing = false;
  }
  resume(){
    this.flowing = true;
    this.read(); // 繼續讀取
  }
}
module.exports = ReadStream;
複製代碼

本身實現的可寫流

let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter{
  constructor(path,options ={}){
    super();
    this.path = path;
    this.flags = options.flags || 'w';
    this.mode = options.mode || 0o666;
    this.highWaterMark = options.highWaterMark || 16*1024;
    this.start = options.start || 0;
    this.autoClose = options.autoClose|| true;
    this.encoding = options.encoding || 'utf8';

    // 是否須要觸發drain事件
    this.needDrain = false;
    // 是否正在寫入
    this.writing = false;
    // 緩存 正在寫入就放到緩存中
    this.buffer = [];
    // 算一個當前緩存的個數
    this.len = 0;
    // 寫入的時候也有位置關係
    this.pos = this.start;
    this.open();
  }
  // 0 [1 2] 
  write(chunk, encoding = this.encoding,callback){
    chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk);
    this.len += chunk.length;// 每次調用write就統計一下長度
    this.needDrain = this.highWaterMark <= this.len; 
    // this.fd
    if(this.writing){
      this.buffer.push({chunk,encoding,callback});
    }else{
      // 當文件寫入後 清空緩存區的內容
      this.writing = true;  // 走緩存
      this._write(chunk,encoding,()=>this.clearBuffer());
    }
    return !this.needDrain; // write 的返回值必須是true / false   
    
    //這時候能夠回頭看一下上面的例子,在this.len >= this.higWaterMark的時候,返回了一個fasle,例子中就暫停讀取了。等待寫入完成
  }
  _write(chunk,encoding,callback){
    if (typeof this.fd !== 'number') {
      return this.once('open', () => this._write(chunk, encoding, callback));
    }
    // fd是文件描述符 chunk是數據 0 寫入的位置和 長度 , this.pos偏移量
    fs.write(this.fd, chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
      this.pos += bytesWritten;
      this.len -= bytesWritten; // 寫入的長度會減小
      callback();
    });
  }
  clearBuffer(){
    let buf = this.buffer.shift();
    if(buf){
      this._write(buf.chunk, buf.encoding, () => this.clearBuffer());
    }else{
      this.writing = false;
      this.needDrain = false; // 觸發一次drain  再置回false 方便下次繼續判斷
      this.emit('drain');
    }
  }
  destroy(){
    if(typeof this.fd === 'number'){
      fs.close(this.fd,()=>{
        this.emit('close');
      });
      return 
    }
    this.emit('close');
  }
  open(){
    fs.open(this.path,this.flags,this.mode,(err,fd)=>{
      if(err){
        this.emit('error');
        this.destroy();
        return 
      }
      this.fd = fd;
      this.emit('open');
    });
  }
}
module.exports = WriteStream;
複製代碼

以上就是流的一些基礎知識,流的簡單應用以及本身實現的可讀流可寫流。固然有不少不足之處,但願朋友們提出指正。也但願和各位朋友一塊兒學習分享!spa

相關文章
相關標籤/搜索