接上一篇 RxJS的另外四種實現方式(五)——使用生成器實現webpack
該實現方式與以前幾種不一樣的,該實現方式僅針對Nodejs環境。在Nodejs環境中,提供了Stream類,包括Readable、Transform、Writeable等子類都是可擴展的。從字面上看,正好對應Rx中的生產者、傳遞者、消費者。web
實現該庫的原由是,一次在Nodejs中須要在koa框架裏面提供event-stream功能,目前除了IE瀏覽器外其餘瀏覽器都支持了服務端事件推送,這個功能能夠很好的代替輪詢。webpack用的熱更新就是經過這個功能實現的。瀏覽器
言歸正傳,首先得實現生產者,咱們先來看interval框架
class Interval extends Readable { constructor(period) { super({ objectMode: true }) this.period = period this.i = 0 } _read(size) { setTimeout(() => this.push(this.i++), this.period) } } exports.interval = period => new Interval(period)
說明一下,構造函數傳入objectMode:true的對象是讓stream處於對象模式,而不是二進制流模式。_read函數必須覆蓋父類,不然出錯,當有訂閱者鏈接上來後,就會調用_read方法。咱們在這個方法裏面發送數據,即調用push方法,將數據發送給流的接收者。koa
當調用過push方法後,後面的接收者若是調用了callback回調,則表示數據消費完畢,會再次調用_read方法,直到push(null)表示生產者已經complete函數
FromArray也十分簡單易讀oop
class FromArray extends Readable { constructor(array) { super({ objectMode: true }) this.array = array this.pos = 0 this.size = array.length } _read(size) { if (this.pos < this.size) { this.push(this.array[this.pos++]) } else this.push(null) } } exports.fromArray = array => new FromArray(array)
下面要實現一個轉換器(操做符)Filterui
class Filter extends Transform { constructor(f) { super({ readableObjectMode: true, writableObjectMode: true }) this.f = f } _transform(data, encoding, callback) { const f = this.f if (f(data)) { this.push(data); } callback(); } _flush(callback) { callback() } } exports.filter = f => new Filter(f)
這時候咱們須要覆蓋_transform、_flush函數,一樣的,push方法會讓數據流到下面的流中,而callback回調會使得上一個流繼續發送數據。this
最後咱們來實現Subscriber.net
class Subscriber extends Writable { constructor(n, e, c) { super({ objectMode: true }) this.n = n this.e = e this.c = c } _write(chunk, encoding, callback) { this.n(chunk) callback(null) } _final(callback) { this.c() callback() } } exports.subscribe = (n, e = noop, c = noop) => new Subscriber(n, e, c)
Subscriber是一個可寫流,咱們必須覆蓋_write方法用於消費數據,_final方法用於complete事件處理。這裏沒有實現error事件。有興趣的同窗能夠思考如何實現。
最後咱們須要把各類stream串起來,變成一個長長的水管
exports.pipe = pipeline || ((first, ...cbs) => cbs.reduce((aac, c) => aac.pipe(c), first));
高版本的Nodejs已經提供了pipeline方法,能夠直接使用,低版本的話,能夠用上面的方法進行鏈接。
至此,咱們已經使用Nodejs提供的Stream類實現了Rx的基本邏輯。(完)