深刻nodejs中流(stream)的理解

流的基本概念及理解

流是一種數據傳輸手段,是有順序的,有起點和終點,好比你要把數據從一個地方傳到另一個地方
流很是重要,gulp,webpack,HTTP裏的請求和響應,http裏的socket都是流,包括後面壓縮,加密等

流爲何這麼好用還這麼重要呢?javascript

  • 由於有時候咱們不關心文件的主體內容,只關心能不能取到數據,取到數據以後怎麼進行處理
  • 對於小型的文本文件,咱們能夠把文件內容所有讀入內存,而後再寫入文件,好比grunt-file-copy
  • 對於體積較大的二進制文件,好比音頻、視頻文件,動輒幾個GB大小,若是使用這種方法,很容易使內存「爆倉」。
  • 理想的方法應該是讀一部分,寫一部分,無論文件有多大,只要時間容許,總會處理完成,這裏就須要用到流的概念
流是一個抽象接口,被Node中不少對象所實現,好比HTTP服務器request和response對象都是流

Node.js 中有四種基本的流類型:java

  • Readable - 可讀的流 (例如 fs.createReadStream()).
  • Writable - 可寫的流 (例如 fs.createWriteStream()).
  • Duplex - 可讀寫的流 (例如 net.Socket).
  • Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).
能夠經過 require('stream') 加載 Stream 基類。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基類

Readable streams可讀流

可讀流(Readable streams)是對提供數據的 源頭(source)的抽象
可讀流的例子包括:
  • HTTP responses, on the client :客戶端請求
  • HTTP requests, on the server :服務端請求
  • fs read streams :讀文件
  • zlib streams :壓縮
  • crypto streams :加密
  • TCP sockets :TCP協議
  • child process stdout and stderr :子進程標準輸出和錯誤輸出
  • process.stdin :標準輸入
全部的 Readable 都實現了 stream.Readable 類定義的接口

經過流讀取數據linux

  • 用Readable建立對象readable後,便獲得了一個可讀流
  • 若是實現_read方法,就將流鏈接到一個底層數據源
  • 流經過調用_read向底層請求數據,底層再調用流的push方法將須要的數據傳遞過來
  • 當readable鏈接了數據源後,下游即可以調用readable.read(n)向流請求數據,同時監聽readable的data事件來接收取到的數據
下面簡單舉個可讀流的例子:
  • 監聽可讀流的data事件,當你一旦開始監聽data事件的時候,流就能夠讀文件的內容而且發射data,讀一點發射一點讀一點發射一點
  • 默認狀況下,當你監聽data事件以後,會不停的讀數據,而後觸發data事件,觸發完data事件後再次讀數據
  • 讀的時候不是把文件總體內容讀出來再發射出來的,並且設置一個緩衝區,大小默認是64K,好比文件是128K,先讀64K發射出來,再讀64K在發射出來,會發射兩次
  • 緩衝區的大小能夠經過highWaterMark來設置
let fs = require('fs');
//經過建立一個可讀流
let rs = fs.createReadStream('./1.txt',{
    flags:'r',//咱們要對文件進行何種操做
    mode:0o666,//權限位
    encoding:'utf8',//不傳默認爲buffer,顯示爲字符串
    start:3,//從索引爲3的位置開始讀
    //這是個人見過惟一一個包括結束索引的
    end:8,//讀到索引爲8結束
    highWaterMark:3//緩衝區大小
});
rs.on('open',function () {
    console.log('文件打開');
});
rs.setEncoding('utf8');//顯示爲字符串
//但願流有一個暫停和恢復觸發的機制
rs.on('data',function (data) {
    console.log(data);
    rs.pause();//暫停讀取和發射data事件
    setTimeout(function(){
        rs.resume();//恢復讀取並觸發data事件
    },2000);
});
//若是讀取文件出錯了,會觸發error事件
rs.on('error',function () {
    console.log("error");
});
//若是文件的內容讀完了,會觸發end事件
rs.on('end',function () {
    console.log('讀完了');
});
rs.on('close',function () {
    console.log('文件關閉');
});

/**
文件打開
334
455
讀完了
文件關閉
**/

可讀流的簡單實現

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on('open', function () {
    console.log("open");
});
rs.on('data', function (data) {
    console.log(data);
});
rs.on('end', function () {
    console.log("end");
});
rs.on('close', function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;

自定義可讀流

爲了實現可讀流,引用Readable接口並用它構造新對象
  • 咱們能夠直接把供使用的數據push出去。
  • 當push一個null對象就意味着咱們想發出信號——這個流沒有更多數據了
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+'');
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on('data', function(data){
    console.log("讀到數據: " + data.toString());//no maybe
});
counter.on('end', function(data){
    console.log("讀完了");
});

可讀流的兩種模式

Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式決定了chunk數據流動的方式---自動流動仍是手工流動。那如何觸發這兩種模式呢:
  • flowing mode: 註冊事件data、調用resume方法、調用pipe方法
  • paused mode: 調用pause方法(沒有pipe方法)、移除data事件 && unpipe全部pipe
若是 Readable 切換到 flowing 模式,且沒有消費者處理流中的數據,這些數據將會丟失。 好比, 調用了 readable.resume() 方法卻沒有監聽 'data' 事件,或是取消了 'data' 事件監聽,就有可能出現這種狀況

可讀流的三種狀態webpack

在任意時刻,任意可讀流應確切處於下面三種狀態之一:
  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true
兩種模式取決於可讀流flowing狀態:
  • 若爲true : flowing mode;
  • 若爲false : paused mode

flowing modeweb

經過註冊data、pipe、resume能夠自動獲取所須要的數據,咱們來看下源碼的實現
// data事件觸發flowing mode
 if (ev === 'data') {
    // Start flowing on next tick if stream isn't explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === 'readable') {
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }

// resume觸發flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
        debug('resume');
        state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法觸發flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}
flowing mode的三種方法最後均是經過resume方法,將狀態變爲true:state.flowing = true

paused modejson

在paused mode下,須要手動地讀取數據,而且能夠直接指定讀取數據的長度
能夠經過監聽事件readable,觸發時手工讀取chunk數據:
  • 當你監聽 readable事件的時候,會進入暫停模式
  • 當監聽readable事件的時候,可讀流會立刻去向底層讀取文件,而後把讀到文件的文件放在緩存區裏const state = this._readableState;
  • self.read(0); 只填充緩存,可是並不會發射data事件,可是會發射stream.emit('readable');事件
  • this._read(state.highWaterMark); 每次調用底層的方法讀取的時候是讀取3個字節
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
rs.on('readable',function(){
    console.log(rs._readableState.length);
    //read若是不加參數表示讀取整個緩存區數據
    //讀取一個字段,若是可讀流發現你要讀的字節小於等於緩存字節大小,則直接返回
    let chunk = rs.read(1);
    console.log(chunk);
    console.log(rs._readableState.length);
    //當你讀完指定的字節後,若是可讀流發現剩下的字節已經比最高水位線小了。則會立馬再次讀取填滿 最高水位線
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});
注意:一旦註冊了readable事件,必須手工讀取read數據,不然數據就會流失,咱們來看下源碼的實現
function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug('emitReadable', state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  var state = stream._readableState;
  debug('emit readable');
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit('readable');
  }
  state.needReadable = !state.flowing && !state.ended;
  flow(stream);
}

function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

function endReadable(stream) {
  var state = stream._readableState;
  debug('endReadable', state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

Readable.prototype.read = function(n) {
  debug('read', n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;
  if (n !== 0)
    state.emittedReadable = false;
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }
flow方法直接read數據,將獲得的數據經過事件data交付出去,然而此處沒有註冊data事件監控,所以,獲得的chunk數據並無交付給任何對象,這樣數據就白白流失了,因此在觸發emit('readable')時,須要提早read數據

Writable streams可寫流

可寫流是對數據寫入'目的地'的一種抽象
Writable:可寫流的例子包括了:
  • HTTP requests, on the client 客戶端請求
  • HTTP responses, on the server 服務器響應
  • fs write streams 文件
  • zlib streams 壓縮
  • crypto streams 加密
  • TCP sockets TCP服務器
  • child process stdin 子進程標準輸入
  • process.stdout, process.stderr 標準輸出,錯誤輸出
下面舉個可寫流的簡單例子
  • 當你往可寫流裏寫數據的時候,不是會馬上寫入文件的,而是會很寫入緩存區,緩存區的大小就是highWaterMark,默認值是16K。而後等緩存區滿了以後再次真正的寫入文件裏
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
   flags:'w',
   mode:0o666,
   start:3,
   highWaterMark:3//默認是16K
});
  • 若是緩存區已滿 ,返回false,若是緩存區未滿,返回true
  • 若是能接着寫,返回true,若是不能接着寫,返回false
  • 按理說若是返回了false,就不能再往裏面寫了,可是若是你真寫了,若是也不會丟失,會緩存在內存裏。等緩存區清空以後再從內存裏讀出來
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false

'drain' 事件gulp

若是調用 stream.write(chunk) 方法返回 false,流將在適當的時機觸發 'drain' 事件,這時才能夠繼續向流中寫入數據

當一個流不處在 drain 的狀態, 對 write() 的調用會緩存數據塊, 而且返回 false。 一旦全部當前全部緩存的數據塊都排空了(被操做系統接受來進行輸出), 那麼 'drain' 事件就會被觸發緩存

建議, 一旦 write() 返回 false, 在 'drain' 事件觸發前, 不能寫入任何數據塊服務器

舉個簡單的例子說明一下:異步

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//緩存區未滿
    //寫入方法是同步的,可是寫入文件的過程是異步的。在真正寫入文件後還會執行咱們的回調函數
 while(flag && count>0){
     console.log('before',count);
     flag = ws.write((count)+'','utf8',(function (i) {
         return ()=>console.log('after',i);
     })(count));
     count--;
 }
}
write();//987
//監聽緩存區清空事件
ws.on('drain',function () {
    console.log('drain');
    write();//654 321
});
ws.on('error',function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
若是已經再也不須要寫入了,能夠調用end方法關閉寫入流,一旦調用end方法以後則不能再寫入
好比在 ws.end();後寫 ws.write('x');,會報錯 write after end

'pipe'事件

linux精典的管道的概念,前者的輸出是後者的輸入

pipe是一種最簡單直接的方法鏈接兩個stream,內部實現了數據傳遞的整個過程,在開發的時候不須要關注內部數據的流動

  • 這個方法從可讀流拉取全部數據, 並將數據寫入到提供的目標中
  • 自動管理流量,將數據的滯留量限制到一個可接受的水平,以使得不一樣速度的來源和目標不會淹沒可用內存
  • 默認狀況下,當源數據流觸發 end的時候調用end(),因此寫入數據的目標不可再寫。傳 { end:false }做爲options,能夠保持目標流打開狀態

pipe方法的原理

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});
下面舉個簡單的例子說明一下pipe的用法:
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt',{
    highWaterMark:3
});
rs.pipe(ws);
//移除目標可寫流
rs.unpipe(ws);
  • 當監聽可讀流data事件的時候會觸發回調函數的執行
  • 能夠實現數據的生產者和消費者速度的均衡
rs.on('data',function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});
  • 監聽可寫流緩存區清空事件,當全部要寫入的數據寫入完成後,接着恢復從可讀流裏讀取並觸發data事件
ws.on('drain',function () {
    console.log('drain');
    rs.resume();
});

unpipe

readable.unpipe()方法將以前經過stream.pipe()方法綁定的流分離
  • 若是寫入的目標沒有傳入, 則全部綁定的流都會被分離
  • 若是指定了寫入的目標,可是沒有綁定流,則什麼事情都不會發生
簡單距離說明下unpipe的用法:
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('關閉向2.txt的寫入');
from.unpipe(writable);
console.log('手工關閉文件流');
to.end();
}, 1000);

pipe的簡單實現

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}

自定義管道流

const stream = require('stream')

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log('push', ++index)
            this.push(index+'');
        })
    }
})
const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log('寫入:', chunk.toString())
    }
})
readable.pipe(writable);

可寫流的簡單實現

let fs = require('fs');
 let FileWriteStream = require('./FileWriteStream');
 let ws = FileWriteStream('./2.txt',{
     flags:'w',
     encoding:'utf8',
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1",'utf8',(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on('drain',()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let EventEmitter = require('events');
let util = require('util');
let fs = require('fs');
util.inherits(WriteStream, EventEmitter);

function WriteStream(path, options) {
    EventEmitter.call(this);
    if (!(this instanceof WriteStream)) {
        return new WriteStream(path, options);
    }
    this.path = path;
    this.fd = options.fd;
    this.encoding = options.encoding||'utf8';
    this.flags = options.flags || 'w';
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.pos = this.start;//開始寫入的索引位置
    this.open();//打開文件進行操做
    this.writing = false;//沒有在寫入過程 中
    this.buffers = [];
    this.highWaterMark = options.highWaterMark||16*1024;
    //若是監聽到end事件,並且要求自動關閉的話則關閉文件
    this.on('end', function () {
        if (this.autoClose) {
            this.destroy()
        }
    });
}
WriteStream.prototype.close = function(){
    fs.close(this.fd,(err)=>{
        if(err)
            this.emit('error',err);
    });
}
WriteStream.prototype.open = function () {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err)
            return this.emit('error', err);
        this.fd = fd;//把文件描述符賦給當前實例的fd屬性
        //發射open事件
        this.emit('open', fd);
    });
}
/**
 * 會判斷當前是後臺是否在寫入過程當中,若是在寫入過程當中,則把這個數據放在待處理的緩存中,若是不在寫入過程當中,能夠直接寫。
 */
WriteStream.prototype.write = function (chunk, encoding, cb) {
    chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);

    //先把數據放在緩存裏
    this.buffers.push({
        chunk,
        encoding,
        cb
    });

    let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
    //只有當緩存區寫滿了,那麼清空緩存區的時候纔會發射drain事件,不然 不發放
    this.needDrain = isFull;
    //若是說文件尚未打開,則把寫入的方法壓入open事件的監聽函數。等文件一旦打開,馬上執行寫入操做
    if (typeof this.fd !== 'number') {
         this.once('open', () => {
            this._write();
        });
        return !isFull;
    }else{
        if(!this.writing){
            setImmediate(()=>{
                this._write();
                this.writing = true;
            });
        }

        return !isFull;
    }
}
WriteStream.prototype._write = function () {
    let part = this.buffers.shift();
    if (part) {
        fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
            if(err)return this.emit('error',err);
            part.cb && part.cb();
            this._write();
        });
    }else{
        //發射一個緩存區清空的事件
        this.emit('drain');
        this.writing = false;
    }
}
module.exports = WriteStream;

自定義可寫流

爲了實現可寫流,咱們須要使用流模塊中的Writable構造函數。 咱們只需給Writable構造函數傳遞一些選項並建立一個對象。惟一須要的選項是write函數,該函數揭露數據塊要往哪裏寫
  • chunk一般是一個buffer,除非咱們配置不一樣的流。
  • encoding是在特定狀況下須要的參數,一般咱們能夠忽略它。
  • callback是在完成處理數據塊後須要調用的函數。這是寫數據成功與否的標誌。若要發出故障信號,請用錯誤對象調用回調函數
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString('utf8'));
        console.log("增長: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("項目:" + i, 'utf8');
}
w.end("結束寫入",function(){
    console.log(stock);
});

Duplex streams可讀寫的流(雙工流)

Duplex 流是同時實現了 Readable 和 Writable 接口的流
雙工流的可讀性和可寫性操做徹底獨立於彼此,這僅僅是將兩個特性組合成一個對象

Duplex 流的實例包括了:

  • TCP sockets
  • zlib streams
  • crypto streams
下面簡單實現雙工流:
const {Duplex} = require('stream');
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+'');
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform streams轉換流

變換流(Transform streams) 是一種 Duplex 流。它的輸出與輸入是經過某種方式關聯的。和全部 Duplex 流同樣,變換流同時實現了 Readable 和 Writable 接口

轉換流的輸出是從輸入中計算出來的
對於轉換流,咱們沒必要實現read或write的方法,咱們只須要實現一個transform方法,將二者結合起來。它有write方法的意思,咱們也能夠用它來push數據

變換流的實例包括:

  • zlib streams
  • crypto streams
下面簡單實現轉換流:
const {Transform} = require('stream');
const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
process.stdin.pipe(upperCase).pipe(process.stdout);

對象流

默認狀況下,流處理的數據是Buffer/String類型的值。有一個objectMode標誌,咱們能夠設置它讓流能夠接受任何JavaScript對象
const {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});
let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);
相關文章
相關標籤/搜索