Node.js 知名框架 Express Koa 都在使用的 Events 模塊你瞭解嗎?

人生如同故事。重要的並不在有多長,而是在有多好。——塞涅卡html

在 Node.js 中一個很重要的模塊 Events(EventEmitter 事件觸發器),也稱爲發佈/訂閱模式,爲何說它重要,由於在 Node.js 中絕大多數模塊都依賴於此,例如 Net、HTTP、FS、Stream 等,除了這些系統模塊比較知名的 Express、Koa 框架中也能看到 EventEmitter 的蹤影。前端

談起事件前端的同窗可能會聯想到瀏覽器中的事件,與瀏覽器中的事件不一樣的是它不存在事件冒泡、preventDefault()、stopPropagation() 等方法,EventEmitter 提供了 on()、once()、removeListener() 等方法來對事件進行監聽移除。node

做者簡介:五月君,Nodejs Developer,慕課網認證做者,熱愛技術、喜歡分享的 90 後青年,歡迎關注 Nodejs技術棧 和 Github 開源項目 www.nodejs.redgit

經過本文你能學到什麼

  • 瞭解 EventEmitter 是什麼?一些基礎 API 的使用
  • 在 Node.js 的一些核心模塊(Stream、Net)中是如何使用 EventEmitter 的?
  • 主流的 Express/Koa 框架也是基於此實現,咱們如何實現一個基於 EventEmitter 的自定義對象?
  • 高併發場景下雪崩問題如何利用 EventEmitter 特性解決?
  • 事件是否等價於異步?

先從一個簡單的例子開始

事件驅動是 Node.js 的核心,怎麼體現事件驅動呢?一般一種最多見的形式就是回調,觸發一次事件,而後經過回調來接收一些處理,關於這種形式在 JavaScript 編程中家常便飯,例如 fs.readFile(path, callback)、TCP 中的 server.on('data', callback) 等。github

一個簡單的實現

主要用到如下兩個 API,觸發、註冊一個監聽函數。數據庫

  • emit:觸發一個監聽函數
  • on:註冊一個監聽函數
const EventEmitter = require('events').EventEmitter;
const emitter = new EventEmitter();

emitter.on("起牀", function(time) {
    console.log(`早上 ${time} 開始起牀,新的一天加油!`)
    //console.log(`關注公衆號Nodejs技術棧,早上 ${time} 點開始起牀閱讀,從 Node.js 技術棧`);
});

emitter.emit("起牀", "6:00");
複製代碼

運行程序以後效果以下所示:編程

早上 6:00 開始起牀,新的一天加油!
複製代碼

除了上面使用 emit、on 方法外還有一些頗有用的 API,你也許須要先去 Node.js 官網(nodejs.cn/api/events.…)作一個瞭解,那裏介紹的很全,在接來的學習中,我會在一些示例中演示一部分的核心 API 如何應用。api

自定義 EventEmitter 類

當你瞭解了 EventEmitter,你會發現它在 Node.js 中無所不在,Node.js 的核心模塊、Express/Koa 等知名框架中,你都會發現它的蹤影,例如,下面在 Koa 中 new 一個 app 對象,經過 app.emit() 觸發一個事件,實如今整個系統中進行傳遞。瀏覽器

const Koa = require('koa');
const app = new Koa();

app.on("koa", function() {
    console.log("在 Koa 中使用 EventEmitter");
});

app.emit("koa");
複製代碼

系統模塊自定義 EventEmitter 類的實現

在這開始以前讓咱們先看下 Node.js 中的 Stream、Net 模塊是怎麼實現的?緩存

在 Stream 模塊中的實現

// https://github.com/nodejs/node/blob/v10.x/lib/internal/streams/legacy.js#L6

const EE = require('events');
const util = require('util');

function Stream() {
  EE.call(this);
}
util.inherits(Stream, EE);
複製代碼

在 Net 模塊中的實現

// https://github.com/nodejs/node/blob/v10.x/lib/net.js#L1121
const EventEmitter = require('events');
const util = require('util');

function Server(options, connectionListener) {
  if (!(this instanceof Server))
    return new Server(options, connectionListener);

  EventEmitter.call(this);

  ...
}
util.inherits(Server, EventEmitter);
複製代碼

觀察上面兩個 Node.js 模塊的自定義 EventEmitter 實現,都有一個共同點使用了 util.inherits(constructor, superConstructor) 方法,這個是 Node.js 中的工具類,這讓我想起來了以前在看 JavaScript 權威指南(第 6 章 122 頁)中的一個方法 function inherit(p),意思爲經過原型繼承建立一個新對象,而 util.inherits 是經過原型複製來實現的對象間的繼承

例如上面的 util.inherits(Server, EventEmitter) 函數,也就是 Server 對象繼承了 EventEmitter 在原型中定義的函數,也就擁有了 EventEmitter 事件觸發器中的 on、emit 等方法。可是如今 Node.js 官網不建議使用 util.inherits() 方法,而是使用 ES6 中的 class 和 extends 關鍵詞得到語言層面的繼承支持,那麼在原聲 JS 中仍是使用 Object.setPrototypeOf() 來實現的繼承,所以在 Node.js 12x 版本中你會看到以下代碼實現。

// https://github.com/nodejs/node/blob/v12.x/lib/net.js#L1142
function Server(options, connectionListener) {
  if (!(this instanceof Server))
    return new Server(options, connectionListener);

  EventEmitter.call(this);

  ...
}

// https://github.com/nodejs/node/blob/v12.x/lib/net.js#L1188
Object.setPrototypeOf(Server.prototype, EventEmitter.prototype);
Object.setPrototypeOf(Server, EventEmitter);
複製代碼

實現一個基於 EventEmitter 的自定義類

這裏用一個例子一天的計劃來展現如何基於 EventEmitter 自定義類,在不一樣的時間觸發相應的事件,經過監聽事件來作一些事情。

下面展現了咱們自定義的 OneDayPlan 是如何繼承於 EventEmitter

const EventEmitter = require('events');
const oneDayPlanRun = {
    "6:00": function() {
        console.log(`如今是早上 6:00,起牀,開始新的一天加油!`);
    },
    "7:00": function() {
        console.log(`如今是早上 7:00,吃早飯!`);
    }
}

function OneDayPlan() {
    EventEmitter.call(this);
}

Object.setPrototypeOf(OneDayPlan.prototype, EventEmitter.prototype);
Object.setPrototypeOf(OneDayPlan, EventEmitter);
複製代碼

如今讓咱們實例化上面自定義的 OneDayPlan 類,實現事件的觸發/監聽

const oneDayPlan = new OneDayPlan();

oneDayPlan.on("6:00", function() {
    oneDayPlanRun["6:00"]();
});

oneDayPlan.on("7:00", function() {
    oneDayPlanRun["7:00"]();
});

async function doMain() {
    oneDayPlan.emit("6:00");

    await sleep(2000); // 間隔 2 秒鐘輸出

    oneDayPlan.emit("7:00");
}

doMain();

async function sleep(s) {
    return new Promise(function(reslve) {
        setTimeout(function() {
            reslve(1);
        }, s);
    });
}
複製代碼

EventEmitter 解決高併發下雪崩問題

對於須要查詢 DB 的數據,咱們通常稱之爲熱點數據,這類數據一般是要在 DB 之上增長一層緩存,可是在高併發場景下,若是這個緩存正好失效,此時就會有大量的請求直接涌入數據庫,對數據庫形成必定的壓力,對於緩存雪崩的解決方案,網上也不乏有更好的解決方案,可是在 Node.js 中咱們能夠利用 events 模塊提供的 once() 方法來解決。

once 方法介紹

當觸發屢次相同名稱事件,經過 once 添加的偵聽器只會執行一次,而且在執行以後會接觸與它關聯的事件,至關於 on 方法和 removeListener 方法的組合,

proxy.once('我很帥', function() {
    console.log('once: 我很帥!');
});

proxy.on('我很帥', function() {
    console.log('on: 我很帥!');
});


proxy.emit('我很帥');
proxy.emit('我很帥');
proxy.emit('我很帥');
複製代碼

上面觸發了三次 「我很帥」 事件,on 方法乖乖的重複了三次,可是 once 方法說我知道我很帥我只說一次就夠了。

once: 我很帥!
on: 我很帥!
on: 我很帥!
on: 我很帥!
複製代碼

上面說的 once 方法是 on 和 removeListener 的結合體,在源碼中也可看到 github.com/nodejs/node… once 方法接收到信息以後使用 on 方法監聽,在 onceWrapper 方法中經過 removeListener 刪掉監聽函數自身。

function onceWrapper(...args) {
  if (!this.fired) {
    this.target.removeListener(this.type, this.wrapFn);
    this.fired = true;
    return Reflect.apply(this.listener, this.target, args);
  }
}

function _onceWrap(target, type, listener) {
  var state = { fired: false, wrapFn: undefined, target, type, listener };
  var wrapped = onceWrapper.bind(state);
  wrapped.listener = listener;
  state.wrapFn = wrapped;
  return wrapped;
}

EventEmitter.prototype.once = function once(type, listener) {
  checkListener(listener);

  this.on(type, _onceWrap(this, type, listener));
  return this;
};
複製代碼

編碼實現

利用 once 方法將全部請求的回調都壓入事件隊列中,對於相同的文件名稱查詢保證在同一個查詢開始到結束的過程當中永遠只有一次,若是是 DB 查詢也避免了重複數據帶來的數據庫查詢開銷。代碼編寫參考了深刻淺出 Nodejs Events 模塊一書,這裏使用 fs 進行文件查詢,若是是 DB 也同理,另外注意使用 status 鍵值對形式保存了觸發/監聽的事件名稱和狀態,最後建議進行清除,避免引發大對象致使內存泄露問題。

const events = require('events');
const emitter = new events.EventEmitter();
const fs = require('fs');
const status = {};

const select = function(file, filename, cb) {
    emitter.once(file, cb);
    
    if (status[file] === undefined) {
        status[file] = 'ready'; // 不存在設置默認值
    }
    if (status[file] === 'ready') {
        status[file] = 'pending';
        fs.readFile(file, function(err, result) {
            console.log(filename);
            emitter.emit(file, err, result.toString());
            status[file] = 'ready';
            
            setTimeout(function() {
                delete status[file];
            }, 1000);
        });
    }
}

for (let i=1; i<=11; i++) {
    if (i % 2 === 0) {
        select(`/tmp/a.txt`, 'a 文件', function(err, result) {
            console.log('err: ', err, 'result: ', result);
        });
    } else {
        select(`/tmp/b.txt`, 'b 文件', function(err, result) {
            console.log('err: ', err, 'result: ', result);
        });
    }
}
複製代碼

控制檯運行以上代碼進行測試,雖然發起了屢次文件查詢請求,fs 模塊真正只執行了兩次,分別查詢了 a、b 兩個文件,對於相同的請求,經過利用事件監聽器 once 的特性避免了相同條件重複查詢。

b 文件
err:  null result:  b
err:  null result:  b
err:  null result:  b
err:  null result:  b
err:  null result:  b
err:  null result:  b
err:  null result:  b
a 文件
err:  null result:  a
err:  null result:  a
err:  null result:  a
err:  null result:  a
err:  null result:  a
複製代碼

默認狀況下,若是爲特定事件添加了超過 10 個監聽器,則 EventEmitter 會打印一個警告。 可是,並非全部的事件都要限制 10 個監聽器。 emitter.setMaxListeners() 方法能夠爲指定的 EventEmitter 實例修改限制。

(node:88835) Warning: Possible EventEmitter memory leak detected. 11 /tmp/b.txt listeners added. Use emitter.setMaxListeners() to increase limit
(node:88835) Warning: Possible EventEmitter memory leak detected. 11 /tmp/a.txt listeners added. Use emitter.setMaxListeners() to increase limit
複製代碼

EventEmitter 循環調用問題

以下代碼所示,嘗試分析如下兩種狀況的輸出結果

const events = require('events');
const emitter = new events.EventEmitter();
const test = () => console.log('test');

/** 例一 */
emitter.on('test', function() {
    test();
    emitter.emit('test');
})

emitter.emit('test');

/** 例二 */
emitter.on('test', function() {
    test();
    emitter.on('test', test);
})

emitter.emit('test');
複製代碼

例一由於在監聽函數 on 裏執行了 emit 事件觸發,會陷入死循環致使棧溢出。

例二結果爲只輸出一次 test,emitter.on('test', test); 這行代碼只是在當前的事件回調中添加了一個事件監聽器。

例一:RangeError: Maximum call stack size exceeded
例二:test
複製代碼

同步仍是異步

換一個問題事件是否等於異步?答案是不等的,看如下代碼示例執行順序,先輸出 111 再輸出 222,爲何這樣?摘自官方 API 的一段話 「EventEmitter 會按照監聽器註冊的順序同步地調用全部監聽器。 因此必須確保事件的排序正確,且避免競態條件。

const events = require('events');
const emitter = new events.EventEmitter();

emitter.on('test',function(){
    console.log(111)
});
emitter.emit('test');
console.log(222)

// 輸出
// 111
// 222
複製代碼

也可使用 setImmediate() 或 process.nextTick() 切換到異步模式,代碼以下所示:

const events = require('events');
const emitter = new events.EventEmitter();

emitter.on('test',function(){
    setImmediate(() => {
        console.log(111);
    });
});
emitter.emit('test');
console.log(222)

// 輸出
// 222
// 111
複製代碼

錯誤處理

最後一個最重要的錯誤處理,在 Node.js 中錯誤處理是一個須要重視的事情,一旦拋出一個錯誤沒有人爲處理,可能形成的結果是進程自動退出,以下代碼由於事件觸發器帶有錯誤信息,而沒有相應的錯誤監聽在,會致使進程退出。

const events = require('events');
const emitter = new events.EventEmitter();

emitter.emit('error', new Error('This is a error'));
console.log('test');
複製代碼

調用後程序崩潰致使 Node 進程自動退出,因受上一行的影響,以後的 console.log('test'); 也不會獲得執行。

events.js:167
      throw er; // Unhandled 'error' event
      ^

Error: This is a error
複製代碼

做爲最佳實踐,應該始終爲 'error' 事件註冊監聽器

const events = require('events');
const emitter = new events.EventEmitter();

emitter.on('error', function(err) {
    console.error(err);
})

emitter.emit('error', new Error('This is a error'));

console.log('test');
複製代碼
Error: This is a error
    at Object.<anonymous> ...
test
複製代碼

如上代碼所示,第一次調用後錯誤 error 事件會被監聽,Node 進程也不會像以前的程序同樣會自動退出,console.log('test'); 也獲得了正常運行。

總結

許多 Node.js 成功的模塊和框架都是基於 EventEmitter 的,學會 EventEmitter 的使用,而且知道該在何時去使用是很是有用的。

EventEmitter 本質上就是觀察者模式的實現,一個相似的模式是發佈/訂閱,生產者將消息發佈以後無需關心訂閱者的實現,關注過Nodejs技術棧公衆號的同窗,也許你會收到過我以前發佈的 RabbitMQ 系列文章,RabbitMQ 自己也是基於 AMQP 協議,這在一個分佈式集羣環境中使用也是很是好的一種方案。

相關文章
相關標籤/搜索