流的剖析和實現

流的定義

流是抽象化的概念,形象生動的描述了數據的流動、變化。
具體來講,在node中流是處理數據的抽象接口,繼承了EventEmitter,經過這個接口咱們可以控制流的開關,流動的方向等等。
比較形象直觀一點相似咱們在linux上使用shell,經過管道,連接處理各個部分,下面是我寫的一個命令,篩選出version並導出到文件中。
stream-linux-jsdtnode

流的分類

  • Readable(可讀流)
  • Writable(可寫流)
  • Duplex(可讀可寫的流)
  • Transform(在讀寫過程當中能夠修改和變化的Duplex流)

流按照功能大體劃分爲以上四類,具體應用的話有不少場景,以下圖所示(來源:參考連接2)
stream-apply-jsdt
下面我根據流的分類,列舉一些demo應用實例linux

Readable

可讀流能接受各類數據源,例如控制檯的輸入,文件,字符串等等,就如介紹中所說是抽象接口,能夠面向各類形式的輸入,下面舉幾個例子。git

文件流

require('fs').createReadStream('./1.txt',{
    encoding: 'utf8'
}).on('data',(data) => {
    console.log(data)
})
// 輸出  hello jsdt

說明 爲何要用流來讀取,直接用fs.readFile豈不是更方便嗎,由於readFile是總體操做,會將文件所有讀到內存中在作處理,這樣的話文件若是很大,程序就會很卡,甚至報錯。github

標準輸入流

process.stdin.setEncoding('utf8');
process.stdin.on('data',(data) => {
    console.log('輸出: '+ data)
})
node 運行code,而後輸入 hello jsdt
輸出: hello jsdt

說明 這個作acm的時候會用到,或者平時本身寫一些交互式應用的時候shell

普通數據流

let {Readable} = require('stream')
let util = require('util')
class Test extends  Readable{
    constructor(){
        super()
        this.dataSource = 5
    }
    _read(){
        if(this.dataSource-->0){
            this.push(this.dataSource+'');
        }else{
            this.push(null);
        }
    }
}
let counter = new Test();
counter.on('data',function(data){
    console.log(data.toString())
});
輸出:
4
3
2
1

說明 重寫_read方法,自定義輸入的邏輯,上面示例中是本身邏輯中產生的一個數據源。api

Writable

文件流

let dataSource = 'hello jsdt',i = 0;
(function(){
    let ws = require('fs').createWriteStream('./1.txt',{
        encoding: 'utf8'
    })
    let flag = true;
    while(flag && i<dataSource.length){
        flag = ws.write(dataSource[i++]+'');
    }
})()

write-test-jsdt
說明 閉包自執行,經過流將數據寫入到文件中,上面是輸出結果。緩存

自定義輸出

let {Writable} = require('stream')
let arr = []
let ws = Writable({
    write(chunk,encoding,cb){
        arr.push(chunk)
        cb()
    }
})
for(let i = 1; i<= 3;i++){
  ws.write(''+i,'utf8',()=>{})
}
process.nextTick(function () {
    console.log(arr.toString())
})
//  輸出 1,2,3

說明 上面重寫了流的write方法,能夠自定義寫邏輯閉包

Duplex

require('net').createServer(socket => {
    socket.on('data',data => {
        console.log('client message ' + data);
        socket.write("server message " + 'hello client ');
    })
}).listen(8080,() =>{})

duplex-jsdt
說明 做爲可寫流一面socket能夠向客戶端發送信息,作爲可讀流一面能夠監聽data事件,收到客戶端發送過信息app

Transform

let t = require('stream').Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
// 輸入abc
// 輸出ABC

說明 上面使用轉換流,實現了terminal上小寫輸入,對應大寫輸出的功能socket

流中數據分類

  • 二進制模式
  • 對象模式

在建立流的時候能夠指定配置,objectMode默認爲false,設爲true切換到對象模式。二進制即buffer模式,可讀或可寫流都會將數據會緩存數據在buffer中。

流的剖析

經過上面的介紹咱們明確了流的定義,並按照功能對流進行了分類,下面我進行下剖析,總的來講流的各類形態間轉化傳輸底層都是二進制,具體到使用形態上有buffer,string等等。
首先詳細說下可讀流,可讀流有兩種模式,默認爲paused模式。

  • flowing 按照初始化配置,自動讀取數據,並經過觀察者模式,直接將數據提供給訂閱者
  • paused 顯式調用流的read方法讀取數據

其中若是咱們想切換到流動模式能夠經過監聽data事件的方式、或者調用stream.resume()、stream.pipe() 這些方法。

可讀流源碼分析

// 可讀流入口,根據配置返回一個可讀流
fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};

// 實現原理是ReadStream.prototype.__proto__ = Readable.prototype,能夠繼承Readable上的一些方法
util.inherits(ReadStream, Readable);
fs.ReadStream = ReadStream;

function ReadStream(path, options) {
  // 非new方式調用,直接返回一個實例
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  options = copyObject(getOptions(options, {}));
  if (options.highWaterMark === undefined)
  // highWaterMark默認值爲64k,設置了flow模式下緩衝區的大小
    options.highWaterMark = 64 * 1024;  

  Readable.call(this, options);

  handleError((this.path = getPathFromURL(path)));
  // 文件描述符,根據這個句柄找到文件
  this.fd = options.fd === undefined ? null : options.fd;
  // flags打開文件要作的操做,默認爲'r'
  this.flags = options.flags === undefined ? 'r' : options.flags;
  // 用於設置文件模式(權限和粘結位),僅限建立文件時。
  this.mode = options.mode === undefined ? 0o666 : options.mode;
  // 開始讀取位置
  this.start = options.start;
  // 結束讀取位置(!!!包括結束位置)
  this.end = options.end;
  /**
   * 若是 autoClose 爲 false,則文件描述符不會被關閉,即便有錯誤。 
   * 須要程序負責關閉它,而且確保沒有文件描述符泄漏。 
   * 若是 autoClose 被設置爲 true(默認),則在 error 或 end 時,文件描述符會被自動關閉
   */
  this.autoClose = options.autoClose === undefined ? true : options.autoClose;
   this.pos = this.start;
   
  }
// 適合傳入句柄的狀況,例如fd: 0,這樣就不是文件,而是控制檯輸入的數據了
  if (typeof this.fd !== 'number')
    this.open();
  this.on('end', function() {
    if (this.autoClose) {
      this.destroy();
    }
  });
}

// 打開文件,並觸發open事件,只有打開了才能讀取,因此在回調中觸發open事件,看下步操做
ReadStream.prototype.open = function() {
  var self = this;
  fs.open(this.path, this.flags, this.mode, function(er, fd) {
    self.fd = fd;
    self.emit('open', fd);
    //  start the flow of data.
    self.read();
  });
};
Readable.prototype.read = function(n) {
    // 當read(0)時,若是緩存中已有數據,則觸發readable事件,至關於刷新下緩存。不然觸發end事件
if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  //  若可讀流已經被傳入了終止符(null),且緩衝中沒有遺留數據,則結束這個可讀流
  if (n === 0 && state.ended) {
      if (state.length === 0)
        endReadable(this);
      return null;
    }
    //  若目前緩衝中的數據大小爲空,或未超過設置的警惕線,則進行一次數據讀取。
      if (state.length === 0 || state.length - n < state.highWaterMark) {
        doRead = true;
      }
        if (state.ended || state.reading) {
          doRead = false;
        } else if (doRead) {
          state.reading = true;
          state.sync = true;
          this._read(state.highWaterMark);
     }


}
ReadStream.prototype._read = function(n) {
  if (typeof this.fd !== 'number') {
    // 防止重複綁定open事件,當文件打開且emit open事件,此時纔會進行真正的讀操做
    return this.once('open', function() {
      this._read(n);
    });
  }
 // 而後讀數據的時候會計算實際讀的數量
 function howMuchToRead(n, state) {
    //  若是讀的數量超過highWaterMark,則從新計算highWaterMark
    if (n > state.highWaterMark)
      state.highWaterMark = computeNewHighWaterMark(n);
    if (n <= state.length)
      return n;
 }
  // 通過上面一系列的準備工做,下面開始真正的讀操做咯
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
      if (bytesRead > 0) {
        this.bytesRead += bytesRead;
      }
      this.push(b);
  });
};

// 上面整個過程是paused的流程,其中flow模式又有所不一樣,以下所示
// 若是監聽了data事件,則會調用this.resume(),開始流動模式
Readable.prototype.on = function(ev, fn) {
  const res = Stream.prototype.on.call(this, ev, fn);
  if (ev === 'data') {
    //  Start flowing on next tick if stream isn't explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  }
  }
// flow模式下 流內部自動觸發data事件,循環讀取數據
function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}
// 而後觸發 data事件,循環發射數據
stream.emit('data', chunk);

總結 上面是可讀流的源碼分析,摘要了關鍵部分,下面在梳理一下,當經過ReadStream建立一個流的時候,默認會觸發readable事件,進入暫停模式,此時內部維護的有一個緩衝區,在readable事件回調邏輯中進行read操做,首先會經過howMuchToRead方法計算實際讀取的數量,若是現有數據小於highWaterMark,內部會進行this._read(state.highWaterMark)操做,其回調中會進行push操做,push在調用readableAddChunk將數據放到內部維護的緩存中,反之則從fromList中讀取緩存中的數據,而後返回。而若是監聽了data事件,代碼中所示會調用this.resume(),將流狀態設置爲flowing模式,而後resume()->resume_()->flow()的調用順序執行flow方法循環讀取數據,觸發data事件,完成數據的自動讀取,而後發射給調用者,會不停的循環整個過程。上面比較值的注意一點的就是flow模式和paused模式區別,若是是flow模式在addChunk的時候,以下所示

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    stream.emit('data', chunk);
    stream.read(0);
  } 
}

會自動發射數據,不會走緩存,而paused模式會走一遍內部的緩存機制。
根據上面node源碼的分析過程,下面圖形化描述下整個流程。
read-stream-jsdt

本身實現的一個可讀流

可寫流源碼分析

// 1:首先第一步根據createWriteStream傳入參數進行初始化
// 2:調用寫操做
Writable.prototype.write = function(chunk, encoding, cb) {
  if (state.ended)
   //在end繼續寫入會emit一個error事件
    writeAfterEnd(this, cb);
  else if (validChunk(this, state, chunk, cb)) {
  //在校驗數據chunk合法的狀況下才會進行後續的寫邏輯
    state.pendingcb++;
    ret = writeOrBuffer(this, state, chunk, encoding, cb);
  }
return ret;
};

function writeOrBuffer(stream, state, chunk, encoding, cb) {
  chunk = decodeChunk(state, chunk, encoding);

  if (chunk instanceof Buffer)
    encoding = 'buffer';
  var len = state.objectMode ? 1 : chunk.length;

  state.length += len;//實時更新緩衝區長度

  var ret = state.length < state.highWaterMark;//判斷緩存區是否超過水位線(highWaterMark,不傳默認16k,源碼_stream_writeable.js--40行)設置
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked) {
 //若是此時處於寫狀態,將新添加的數據放到緩衝池鏈表尾部
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {
    //寫入數據
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }
return ret;
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  if (writev)
    //一次寫入多個數據塊
    stream._writev(chunk, state.onwrite);
  else
  //一次寫入一個數據塊
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}
function onwrite(stream, er) {
    if (!finished &&
        !state.corked &&
        !state.bufferProcessing &&
        state.bufferedRequest) {
        //清空緩衝池 ,不爲空,則循環執行 _write() 寫入單個數據塊
      clearBuffer(stream, state);
    }
  }
}
function clearBuffer(stream, state) {
    // 單個數據寫入
    while (entry) {
      var chunk = entry.chunk;
      var encoding = entry.encoding;
      var cb = entry.callback;
      var len = state.objectMode ? 1 : chunk.length;
        //開啓數據寫操做
      doWrite(stream, state, false, len, chunk, encoding, cb);
      entry = entry.next;
    }
}

總結 上面是可寫流源碼分析,摘要了關鍵流程,首先根據傳入參數進行初始化配置,而後用戶調用write方法進行寫入,寫入前會判斷一下是否超過水位線,超過觸發drain事件,返回false,注意一點此時仍能夠進行寫入,返回false只是告訴你,已經滿了,後須要不要寫入仍是靠用戶根據這個返回值來控制。若是沒超過,在寫以前會先判斷是否處於寫狀態,是的話將數據放到緩存中,反之會進行doWrite <-->clearBuffer這樣的循環操做,一直到數據緩存中數據消耗完爲止。清理完了以後,後續調用write的返回值ret爲false,從而繼續寫,一直循環前面描述的整個過程,直到數據源寫完爲止。總的來講,由於可寫流內部只有一個狀態,複雜度低於可讀流,整個過程仍是比較清晰的,不在圖形化流程。

本身實現的一個可寫流

說明
node源碼分析版本基於v8.9.4
參考資料
http://nodejs.cn/api/
https://medium.freecodecamp.o...

相關文章
相關標籤/搜索