本系列文章爲《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版連接。javascript
歡迎關注個人專欄,以後的博文將在專欄同步:前端
幾乎全部咱們迄今爲止看到的設計模式均可以被認爲是通用的,而且適用於應用程序的許多不一樣的領域。可是,有一套更具體的模式,專一於解決明確的問題。咱們能夠調用這些模式。就像現實生活中的烹飪同樣,咱們有一套明確的步驟來實現預期的結果。固然,這並不意味着咱們不能用一些創意來定製設計模式,以配合咱們的客人的口味,對於書寫Node.js
程序來講是必要的。在本章中,咱們將提供一些常見的解決方案來解決咱們在平常Node.js
開發中遇到的一些具體問題。這些模式包括如下內容:java
Node.js
處理併發請求的能力相悖的阻塞事件循環的同步CPU
綁定操做在Chapter2-Node.js Essential Patterns
中,當咱們討論Node.js
模塊系統的基本屬性時,咱們提到了require()
是同步的,而且module.exports
也不能異步設置。node
這是在覈心模塊和許多npm
包中存在同步API
的主要緣由之一,是否同步加載會被做爲一個option
參數被提供,主要用於初始化任務,而不是替代異步API
。git
不幸的是,這並不老是可能的。同步API
可能並不老是可用的,特別是對於在初始化階段使用網絡的組件,例如執行三次握手協議或在網絡中檢索配置參數。 許多數據庫驅動程序和消息隊列等中間件系統的客戶端都是如此。github
咱們舉一個例子:一個名爲db
的模塊,它將會鏈接到遠程數據庫。 只有在鏈接和與服務器的握手完成以後,db
模塊纔可以接受請求。在這種狀況下,咱們一般有兩種選擇:web
const db = require('aDb'); //The async module
module.exports = function findAll(type, callback) {
if (db.connected) { //is it initialized?
runFind();
} else {
db.once('connected', runFind);
}
function runFind() {
db.findAll(type, callback);
};
};
複製代碼
Dependency Injection
)而不是直接引入異步模塊。經過這樣作,咱們能夠延遲一些模塊的初始化,直到它們的異步依賴被徹底初始化。 這種技術將管理模塊初始化的複雜性轉移到另外一個組件,一般是它的父模塊。 在下面的例子中,這個組件是app.js
:// 模塊app.js
const db = require('aDb'); // aDb是一個異步模塊
const findAllFactory = require('./findAll');
db.on('connected', function() {
const findAll = findAllFactory(db);
// 以後再執行異步操做
});
// 模塊findAll.js
module.exports = db => {
//db 在這裏被初始化
return function findAll(type, callback) {
db.findAll(type, callback);
}
}
複製代碼
咱們能夠看出,若是所涉及的異步依賴的數量過多,第一種方案便不太適用了。redis
另外,使用DI
有時也是不理想的,正如咱們在Chapter7-Wiring Modules
中看到的那樣。在大型項目中,它可能很快變得過於複雜,尤爲對於手動完成並使用異步初始化模塊的狀況下。若是咱們使用一個設計用於支持異步初始化模塊的DI
容器,這些問題將會獲得緩解。算法
可是,咱們將會看到,還有第三種方案可讓咱們輕鬆地將模塊從其依賴關係的初始化狀態中分離出來。mongodb
將模塊與依賴項的初始化狀態分離的簡單模式涉及到使用隊列和命令模式。這個想法是保存一個模塊在還沒有初始化的時候接收到的全部操做,而後在全部初始化步驟完成後當即執行這些操做。
爲了演示這個簡單而有效的技術,咱們來構建一個應用程序。首先建立一個名爲asyncModule.js
的異步初始化模塊:
const asyncModule = module.exports;
asyncModule.initialized = false;
asyncModule.initialize = callback => {
setTimeout(() => {
asyncModule.initialized = true;
callback();
}, 10000);
};
asyncModule.tellMeSomething = callback => {
process.nextTick(() => {
if(!asyncModule.initialized) {
return callback(
new Error('I don\'t have anything to say right now')
);
}
callback(null, 'Current time is: ' + new Date());
});
};
複製代碼
在上面的代碼中,asyncModule
展示了一個異步初始化模塊的設計模式。 它有一個initialize()
方法,在10
秒的延遲後,將初始化的flag
變量設置爲true
,並通知它的回調調用(10
秒對於真實應用程序來講是很長的一段時間了,可是對於具備互斥條件的應用來講可能會顯得力不從心)。
另外一個方法tellMeSomething()
返回當前的時間,可是若是模塊尚未初始化,它拋出產生一個異常。 下一步是根據咱們剛剛建立的服務建立另外一個模塊。 咱們設計一個簡單的HTTP
請求處理程序,在一個名爲routes.js
的文件中實現:
const asyncModule = require('./asyncModule');
module.exports.say = (req, res) => {
asyncModule.tellMeSomething((err, something) => {
if(err) {
res.writeHead(500);
return res.end('Error:' + err.message);
}
res.writeHead(200);
res.end('I say: ' + something);
});
};
複製代碼
在handler
中調用asyncModule
的tellMeSomething()
方法,而後將其結果寫入HTTP
響應中。 正如咱們所看到的那樣,咱們沒有對asyncModule
的初始化狀態進行任何檢查,這可能會致使問題。
如今,建立app.js
模塊,使用核心http
模塊建立一個很是基本的HTTP
服務器:
const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule');
asyncModule.initialize(() => {
console.log('Async module initialized');
});
http.createServer((req, res) => {
if (req.method === 'GET' && req.url === '/say') {
return routes.say(req, res);
}
res.writeHead(404);
res.end('Not found');
}).listen(8000, () => console.log('Started'));
複製代碼
上述模塊是咱們應用程序的入口點,它所作的只是觸發asyncModule
的初始化並建立一個HTTP
服務器,它使用咱們之前建立的handler
(routes.say()
)來對網絡請求做出相應。
咱們如今能夠像往常同樣經過執行app.js
模塊來嘗試啓動咱們的服務器。
在服務器啓動後,咱們能夠嘗試使用瀏覽器訪問URL
:http://localhost:8000/
並查看從asyncModule返回的內容。 和預期的同樣,若是咱們在服務器啓動後當即發送請求,結果將是一個錯誤,以下所示:
Error:I don't have anything to say right now 複製代碼
顯然,在異步模塊加載好了以後:
這意味着asyncModule
還沒有初始化,但咱們仍嘗試使用它,則會拋出一個錯誤。
根據異步初始化模塊的實現細節,幸運的狀況是咱們可能會收到一個錯誤,乃至丟失重要的信息,崩潰整個應用程序。 總的來講,咱們剛剛描述的狀況老是必需要避免的。
大多數時候,可能並不會出現上述問題,畢竟初始化通常來講很快,以致於在實踐中,它永遠不會發生。 然而,對於設計用於自動調節的高負載應用和雲服務器,狀況就徹底不一樣了。
爲了維護服務器的健壯性,咱們如今要經過使用咱們在本節開頭描述的模式來進行異步模塊加載。咱們將在asyncModule
還沒有初始化的這段時間內對全部調用的操做推入一個預初始化隊列,而後在異步模塊加載好後處理它們時當即刷新隊列。這就是狀態模式的一個很好的應用!咱們將須要兩個狀態,一個在模塊還沒有初始化的時候將全部操做排隊,另外一個在初始化完成時將每一個方法簡單地委託給原始的asyncModule
模塊。
一般,咱們沒有機會修改異步模塊的代碼;因此,爲了添加咱們的排隊層,咱們須要圍繞原始的asyncModule
模塊建立一個代理。
接下來建立一個名爲asyncModuleWrapper.js
的新文件,讓咱們依照每一個步驟逐個構建它。咱們須要作的第一件事是建立一個代理,並將原始異步模塊的操做委託給這個代理:
const asyncModule = require('./asyncModule');
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = () => {
activeState.initialize.apply(activeState, arguments);
};
asyncModuleWrapper.tellMeSomething = () => {
activeState.tellMeSomething.apply(activeState, arguments);
};
複製代碼
在前面的代碼中,asyncModuleWrapper
將其每一個方法簡單地委託給activeState
。 讓咱們來看看這兩個狀態是什麼樣子
從notInitializedState
開始,notInitializedState
是指還沒初始化的狀態:
// 當模塊沒有被初始化時的狀態
let pending = [];
let notInitializedState = {
initialize: function(callback) {
asyncModule.initialize(function() {
asyncModuleWrapper.initalized = true;
activeState = initializedState;
pending.forEach(function(req) {
asyncModule[req.method].apply(null, req.args);
});
pending = [];
callback();
});
},
tellMeSomething: function(callback) {
return pending.push({
method: 'tellMeSomething',
args: arguments
});
}
};
複製代碼
當initialize()
方法被調用時,咱們觸發初始化asyncModule
模塊,提供一個回調函數做爲參數。 這使咱們的asyncModuleWrapper
知道何時原始模塊被初始化,在初始化後執行預初始化隊列的操做,以後清空預初始化隊列,再調用做爲參數的回調函數,如下爲具體步驟:
initializedState
賦值給activeState
,表示預初始化已經完成了。因爲此時的模塊還沒有初始化,此狀態的tellMeSomething()
方法僅建立一個新的Command
對象,並將其添加到預初始化隊列中。
此時,當原始的asyncModule
模塊還沒有初始化時,代理應該已經清楚,咱們的代理將簡單地把全部接收到的請求防到預初始化隊列中。 而後,當咱們被通知初始化完成時,咱們執行全部預初始化隊列的操做,而後將內部狀態切換到initializedState
。來看這個代理模塊最後的定義:
let initializedState = asyncModule;
複製代碼
不出意外,initializedState
對象只是對原始的asyncModule
的引用!事實上,初始化完成後,咱們能夠安全地將任何請求直接發送到原始模塊。
最後,設定異步模塊還沒加載好的的狀態,即notInitializedState
let activeState = notInitializedState;
複製代碼
咱們如今能夠嘗試再次啓動咱們的測試服務器,但首先,咱們不要忘記用咱們新的asyncModuleWrapper
對象替換原始的asyncModule
模塊的引用; 這必須在app.js
和routes.js
模塊中完成。
這樣作以後,若是咱們試圖再次向服務器發送一個請求,咱們會看到在asyncModule
模塊還沒有初始化的時候,請求不會失敗; 相反,他們會掛起,直到初始化完成,而後纔會被實際執行。咱們固然能夠確定,比起以前,容錯率變得更高了。
能夠看到,在剛剛初始化異步模塊的時候,服務器會等待請求的響應:
在異步模塊加載完成後,服務器纔會返回響應的信息:
模式:若是模塊是須要異步初始化的,則對每一個操做進行排隊,直到模塊徹底初始化釋放隊列。
如今,咱們的服務器能夠在啓動後當即開始接受請求,並保證這些請求都不會因爲其模塊的初始化狀態而失敗。咱們可以在不使用DI
的狀況下得到這個結果,也不須要冗長且容易出錯的檢查來驗證異步模塊的狀態。
咱們剛剛介紹的模式被許多數據庫驅動程序和ORM
庫所使用。 最值得注意的是Mongoose,它是MongoDB
的ORM
。使用Mongoose
,沒必要等待數據庫鏈接打開,以便可以發送查詢,由於每一個操做都排隊,稍後與數據庫的鏈接徹底創建時執行。 這顯然提升了其API
的可用性。
看一下Mongoose的源碼,它的每一個方法是如何經過代理添加預初始化隊列。 能夠看看實現這中模式的代碼片斷:https://github.com/Automattic/mongoose/blob/21f16c62e2f3230fe616745a40f22b4385a11b11/lib/drivers/node-mongodb-native/collection.js#L103-138
for (var i in Collection.prototype) {
(function(i){
NativeCollection.prototype[i] = function () {
if (this.buffer) {
// mongoose中,在緩衝區不爲空時,只是簡單地把這個操做加入緩衝區內
this.addQueue(i, arguments);
return;
}
var collection = this.collection
, args = arguments
, self = this
, debug = self.conn.base.options.debug;
if (debug) {
if ('function' === typeof debug) {
debug.apply(debug
, [self.name, i].concat(utils.args(args, 0, args.length-1)));
} else {
console.error('\x1B[0;36mMongoose:\x1B[0m %s.%s(%s) %s %s %s'
, self.name
, i
, print(args[0])
, print(args[1])
, print(args[2])
, print(args[3]))
}
}
return collection[i].apply(collection, args);
};
})(i);
}
複製代碼
在高負載的應用程序中,緩存起着相當重要的做用,幾乎在網絡中的任何地方,從網頁,圖像和樣式表等靜態資源到純數據(如數據庫查詢的結果)都會使用緩存。 在本節中,咱們將學習如何將緩存應用於異步操做,以及如何充分利用緩存解決高請求吞吐量的問題。
在這以前,咱們來實現一個小型的服務器,以便用它來衡量緩存和批處理等技術在解決高負載應用程序的優點。
讓咱們考慮一個管理電子商務公司銷售的web
服務器,特別是對於查詢咱們的服務器全部特定類型的商品交易的總和的狀況。 爲此,考慮到LevelUP
的簡單性和靈活性,咱們將再次使用LevelUP
。咱們要使用的數據模型是存儲在sales
這一個sublevel
中的簡單事務列表,它是如下的形式:
transactionId {amount, item}
複製代碼
key
由transactionId
表示,value
則是一個JSON
對象,它包含amount
,表示銷售金額和item
,表示項目類型。 要處理的數據是很是基本的,因此讓咱們當即在名爲的totalSales.js
文件中實現API
,將以下所示:
const level = require('level');
const sublevel = require('level-sublevel');
const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');
module.exports = function totalSales(item, callback) {
console.log('totalSales() invoked');
let sum = 0;
salesDb.createValueStream() // [1]
.on('data', data => {
if(!item || data.item === item) { // [2]
sum += data.amount;
}
})
.on('end', () => {
callback(null, sum); // [3]
});
};
複製代碼
該模塊的核心是totalSales
函數,它也是惟一exports
的API
;它進行以下工做:
salesDb
的sublevel
建立一個Stream
。Stream
將從數據庫中提取全部條目。data
事件,這個事件觸發時,將從數據庫Stream
中提取出每一項,若是這一項的item
參數正是咱們須要的item
,就去累加它的amount
到總的sum
裏面。end
事件觸發時,咱們最終調用callback()
方法。上述查詢方式可能在性能方面並很差。理想狀況下,在實際的應用程序中,咱們可使用索引,甚至使用增量映射來縮短實時計算的時間;可是,因爲咱們須要體現緩存的優點,對於上述例子來講,慢速的查詢實際上更好,由於它會突出顯示咱們要分析的模式的優勢。
爲了完成總銷售應用程序,咱們只須要從HTTP
服務器公開totalSales
的API
;因此,下一步是構建一個(app.js
文件):
const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');
http.createServer((req, res) => {
const query = url.parse(req.url, true).query;
totalSales(query.item, (err, sum) => {
res.writeHead(200);
res.end(`Total sales for item ${query.item} is ${sum}`);
});
}).listen(8000, () => console.log('Started'));
複製代碼
咱們建立的服務器是很是簡單的;咱們只須要它暴露totalSales API
。 在咱們第一次啓動服務器以前,咱們須要用一些示例數據填充數據庫;咱們可使用專用於本節的代碼示例中的populate_db.js
腳原本執行此操做。該腳本將在數據庫中建立100K
個隨機銷售交易。 好的! 如今,一切都準備好了。 像往常同樣,啓動服務器,咱們執行如下命令:
node app
複製代碼
請求這個HTTP
接口,訪問至如下URL
:
http://localhost:8000/?item=book
複製代碼
可是,爲了更好地瞭解服務器的性能,咱們須要連續發送多個請求;因此,咱們建立一個名爲loadTest.js
的腳本,它以200 ms
的間隔發送請求。它已經被配置爲鏈接到服務器的URL
,所以,要運行它,執行如下命令:
node loadTest
複製代碼
咱們會看到這20個請求須要一段時間才能完成。注意測試的總執行時間,由於咱們如今開始咱們的服務,並測量咱們能夠節省多少時間。
在處理異步操做時,最基本的緩存級別能夠經過將一組調用集中到同一個API
來實現。這很是簡單:若是咱們在調用異步函數的同時在隊列中還有另外一個還沒有處理的回調,咱們能夠將回調附加到已經運行的操做上,而不是建立一個全新的請求。看下圖的狀況:
前面的圖像顯示了兩個客戶端(它們能夠是兩臺不一樣的機器,或兩個不一樣的Web
請求),使用徹底相同的輸入調用相同的異步操做。 固然,描述這種狀況的天然方式是由兩個客戶開始兩個單獨的操做,這兩個操做將在兩個不一樣的時刻完成,如前圖所示。如今考慮下一個場景,以下圖所示:
上圖向咱們展現瞭如何對API
的兩個請求進行批處理,或者換句話說,對兩個請求執行到相同的操做。經過這樣作,當操做完成時,兩個客戶端將同時被通知。這表明了一種簡單而又很是強大的方式來下降應用程序的負載,而沒必要處理更復雜的緩存機制,這一般須要適當的內存管理和緩存失效策略。
如今讓咱們在totalSales API
上添加一個批處理層。咱們要使用的模式很是簡單:若是在API
被調用時已經有另外一個相同的請求掛起,咱們將把這個回調添加到一個隊列中。當異步操做完成時,其隊列中的全部回調當即被調用。
如今,讓咱們來改變以前的代碼:建立一個名爲totalSalesBatch.js
的新模塊。在這裏,咱們將在原始的totalSales API
之上實現一個批處理層:
const totalSales = require('./totalSales');
const queues = {};
module.exports = function totalSalesBatch(item, callback) {
if(queues[item]) { // [1]
console.log('Batching operation');
return queues[item].push(callback);
}
queues[item] = [callback]; // [2]
totalSales(item, (err, res) => {
const queue = queues[item]; // [3]
queues[item] = null;
queue.forEach(cb => cb(err, res));
});
};
複製代碼
totalSalesBatch()
函數是原始的totalSales() API
的代理,它的工做原理以下:
item
已經存在隊列中,則意味着該特定item
的請求已經在服務器任務隊列中。在這種狀況下,咱們所要作的只是將回調push
到現有隊列,並當即從調用中返回。不進行後續操做。item
沒有在隊列中,這意味着咱們必須建立一個新的請求。爲此,咱們爲該特定item
的請求建立一個新隊列,並使用當前回調函數對其進行初始化。 接下來,咱們調用原始的totalSales() API
。totalSales()
請求完成時,則執行咱們的回調函數,咱們遍歷隊列中爲該特定請求的item
添加的全部回調,並分別調用這些回調函數。totalSalesBatch()
函數的行爲與原始的totalSales() API
的行爲相同,不一樣之處在於,如今對於相同內容的請求API
進行批處理,從而節省時間和資源。
想知道相比於totalSales() API
原始的非批處理版本,在性能方面的優點是什麼?而後,讓咱們將HTTP
服務器使用的totalSales
模塊替換爲咱們剛剛建立的模塊,修改app.js
文件以下:
//const totalSales = require('./totalSales');
const totalSales = require('./totalSalesBatch');
http.createServer(function(req, res) {
// ...
});
複製代碼
若是咱們如今嘗試再次啓動服務器並進行負載測試,咱們首先看到的是請求被批量返回。
除此以外,咱們觀察到請求的總時間大大減小;它應該至少比對原始totalSales() API
執行的原始測試快四倍!
這是一個驚人的結果,證實了只需應用一個簡單的批處理層便可得到巨大的性能提高,比起緩存機制,也沒有顯得太複雜,由於,無需考慮緩存淘汰策略。
批處理模式在高負載應用程序和執行較爲緩慢的
API
中發揮巨大做用,正是因爲這種模式的運用,能夠批量處理大量的請求。
異步批處理模式的問題之一是對於API
的答覆越快,咱們對於批處理來講,其意義就越小。有人可能會爭辯說,若是一個API
已經很快了,那麼試圖優化它就沒有意義了。然而,它仍然是一個佔用應用程序的資源負載的因素,總結起來,仍然能夠有解決方案。另外,若是API
調用的結果不會常常改變;所以,這時候批處理將並不會有較好的性能提高。在這種狀況下,減小應用程序負載並提升響應速度的最佳方案確定是更好的緩存模式。
緩存模式很簡單:一旦請求完成,咱們將其結果存儲在緩存中,該緩存能夠是變量,數據庫中的條目,也能夠是專門的緩存服務器。所以,下一次調用API
時,能夠當即從緩存中檢索結果,而不是產生另外一個請求。
對於一個有經驗的開發人員來講,緩存不該該是多麼新的技術,可是異步編程中這種模式的不一樣之處在於它應該與批處理結合在一塊兒,以達到最佳效果。緣由是由於多個請求可能併發運行,而沒有設置緩存,而且當這些請求完成時,緩存將會被設置屢次,這樣作則會形成緩存資源的浪費。
基於這些假設,異步請求緩存模式的最終結構以下圖所示:
上圖給出了異步緩存算法的兩個步驟:
另外咱們須要考慮Zalgo
的副作用(咱們已經在Chapter 2-Node.js Essential Patterns
中看到了它的實際應用)。在處理異步API
時,咱們必須確保始終以異步方式返回緩存的值,即便訪問緩存只涉及同步操做。
實踐異步緩存模式的優勢,如今讓咱們將咱們學到的東西應用到totalSales() API
。
與異步批處理示例程序同樣,咱們建立一個代理,其做用是添加緩存層。
而後建立一個名爲totalSalesCache.js
的新模塊,代碼以下:
const totalSales = require('./totalSales');
const queues = {};
const cache = {};
module.exports = function totalSalesBatch(item, callback) {
const cached = cache[item];
if (cached) {
console.log('Cache hit');
return process.nextTick(callback.bind(null, null, cached));
}
if (queues[item]) {
console.log('Batching operation');
return queues[item].push(callback);
}
queues[item] = [callback];
totalSales(item, (err, res) => {
if (!err) {
cache[item] = res;
setTimeout(() => {
delete cache[item];
}, 30 * 1000); //30 seconds expiry
}
const queue = queues[item];
queues[item] = null;
queue.forEach(cb => cb(err, res));
});
};
複製代碼
咱們能夠看到前面的代碼與咱們異步批處理的不少地方基本相同。 其實惟一的區別是如下幾點:
咱們須要作的第一件事就是檢查緩存是否被設置,若是是這種狀況,咱們將當即使用callback()
返回緩存的值,這裏必需要使用process.nextTick()
,由於緩存多是異步設定的,須要等到下一次事件輪詢時纔可以保證緩存已經被設定。
繼續異步批處理模式,可是此次,當原始API
成功完成時,咱們將結果保存到緩存中。此外,咱們還設置了一個緩存淘汰機制,在30
秒後使緩存失效。 一個簡單而有效的技術!
如今,咱們準備嘗試咱們剛建立的totalSales
模塊。 先更改app.js
模塊,以下所示:
// const totalSales = require('./totalSales');
// const totalSales = require('./totalSalesBatch');
const totalSales = require('./totalSalesCache');
http.createServer(function(req, res) {
// ...
});
複製代碼
如今,從新啓動服務器,並使用loadTest.js
腳本進行配置,就像咱們在前面的例子中所作的那樣。使用默認的測試參數,與簡單的異步批處理模式相比,很明顯地有了更好的性能提高。 固然,這很大程度上取決於不少因素;例如收到的請求數量,以及一個請求和另外一個請求之間的延遲等。當請求數量較高且跨越較長時間時,使用高速緩存批處理的優點將更爲顯著。
Memoization被稱作緩存函數調用的結果的算法。 在
npm
中,你能夠找到許多包來實現異步的memoization
,其中最著名的之一之一是memoizee。
咱們必須記住,在實際應用中,咱們可能想要使用更先進的失效技術和存儲機制。 這多是必要的,緣由以下:
LRU
)算法來保持恆定的存儲器利用率。在Chapter4-Asynchronous Control Flow Patterns with ES2015 and Beyond
中,咱們看到了Promise
如何極大地簡化咱們的異步代碼,可是在處理批處理和緩存時,它則能夠提供更大的幫助。
利用Promise
進行異步批處理和緩存策略,有以下兩個優勢:
then()
監聽器能夠附加到相同的Promise
實例。then()
監聽器最多保證被調用一次,即便在Promise
已經被resolve
了以後,then()
也能正常工做。 此外,then()
老是會被保證其是異步調用的。簡而言之,第一個優勢正是批處理請求所須要的,而第二個優勢則在Promise
已是解析值的緩存時,也會提供一樣的的異步返回緩存值的機制。
下面開始看代碼,咱們能夠嘗試使用Promises
爲totalSales()
建立一個模塊,在其中添加批處理和緩存功能。建立一個名爲totalSalesPromises.js
的新模塊:
const pify = require('pify'); // [1]
const totalSales = pify(require('./totalSales'));
const cache = {};
module.exports = function totalSalesPromises(item) {
if (cache[item]) { // [2]
return cache[item];
}
cache[item] = totalSales(item) // [3]
.then(res => { // [4]
setTimeout(() => {delete cache[item]}, 30 * 1000); //30 seconds expiry
return res;
})
.catch(err => { // [5]
delete cache[item];
throw err;
});
return cache[item]; // [6]
};
複製代碼
Promise
確實很好,下面是上述函數的功能描述:
totalSales()
模塊進行promisification
。這樣作以後,totalSales()
將返回一個符合ES2015標準的Promise
實例,而不是接受一個回調函數做爲參數。totalSalesPromises()
時,咱們檢查給定的項目類型是否已經在緩存中有相應的Promise
。若是咱們已經有了這樣的Promise
,咱們直接返回這個Promise
實例。Promise
,咱們繼續經過調用原始(promisified
)的totalSales()
來建立一個Promise
實例。Promise
正常resolve
了,咱們設置了一個清除緩存的時間(假設爲30秒
),咱們返回res
將操做的結果返回給應用程序。Promise
被異常reject
了,咱們當即重置緩存,並再次拋出錯誤,將其傳播到Promise chain
中,因此任何附加到相同Promise
的其餘應用程序也將收到這一異常。Promise
實例。很是簡單直觀,更重要的是,咱們使用Promise
也可以實現批處理和緩存。 若是咱們如今要嘗試使用totalSalesPromise()
函數,稍微調整app.js
模塊,由於如今使用Promise
而不是回調函數。 讓咱們經過建立一個名爲appPromises.js
的app模塊來實現:
const http = require('http');
const url = require('url');
const totalSales = require('./totalSalesPromises');
http.createServer(function(req, res) {
const query = url.parse(req.url, true).query;
totalSales(query.item).then(function(sum) {
res.writeHead(200);
res.end(`Total sales for item ${query.item} is ${sum}`);
});
}).listen(8000, function() {console.log('Started')});
複製代碼
它的實現與原始應用程序模塊幾乎徹底相同,不一樣的是如今咱們使用的是基於Promise
的批處理/緩存封裝版本; 所以,咱們調用它的方式也略有不一樣。
運行如下命令開啓這個新版本的服務器:
node appPromises
複製代碼
雖然上面的totalSales()
在系統資源上面消耗較大,可是其也不會影響服務器處理併發的能力。 咱們在Chapter1-Welcome to the Node.js Platform
中瞭解到有關事件循環的內容,應該爲此行爲提供解釋:調用異步操做會致使堆棧退回到事件循環,從而使其免於處理其餘請求。
可是,當咱們運行一個長時間的同步任務時,會發生什麼狀況,從不會將控制權交還給事件循環?
這種任務也被稱爲CPU-bound
,由於它的主要特色是CPU
利用率較高,而不是I/O
操做繁重。 讓咱們當即舉一個例子上看看這些類型的任務在Node.js
中的具體行爲。
如今讓咱們作一個CPU
佔用比較高的高計算量的實驗。下面來看的是子集總和問題,咱們計算一個數組中是否具備一個子數組,其總和爲0。例如,若是咱們有數組[1, 2, -4, 5, -3]
做爲輸入,則知足問題的子數組是[1, 2, -3]
和[2, -4, 5, -3]
。
最簡單的算法是把每個數組元素作遍歷而後依次計算,時間複雜度爲O(2^n)
,或者換句話說,它隨着輸入的數組長度成指數增加。這意味着一組20
個整數則會有多達1, 048, 576
中狀況,顯然不可以經過窮舉來作到。固然,這個問題的解決方案可能並不算複雜。爲了使事情變得更加困難,咱們將考慮數組和問題的如下變化:給定一組整數,咱們要計算全部可能的組合,其總和等於給定的任意整數。
const EventEmitter = require('events').EventEmitter;
class SubsetSum extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
this.totalSubsets = 0;
} //...
}
複製代碼
SubsetSum
類是EventEmitter
類的子類;這使得咱們每次找到一個匹配收到的總和做爲輸入的新子集時都會發出一個事件。 咱們將會看到,這會給咱們很大的靈活性。
接下來,讓咱們看看咱們如何可以生成全部可能的子集組合:
開始構建一個這樣的算法。建立一個名爲subsetSum.js
的新模塊。在其中聲明一個SubsetSum
類:
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combine(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
複製代碼
無論算法其中究竟是什麼內容,但有兩點要注意:
_combine()
方法是徹底同步的;它遞歸地生成每個可能的子集,而不把CPU
控制權交還給事件循環。若是咱們考慮一下,這對於不須要任何I/O
的算法來講是很是正常的。_processSubset()
方法以供進一步處理。_processSubset()
方法負責驗證給定子集的元素總和是否等於咱們要查找的數字:
_processSubset(subset) {
console.log('Subset', ++this.totalSubsets, subset);
const res = subset.reduce((prev, item) => (prev + item), 0);
if (res == this.sum) {
this.emit('match', subset);
}
}
複製代碼
簡單地說,_processSubset()
方法將reduce
操做應用於子集,以便計算其元素的總和。而後,當結果總和等於給定的sum
參數時,會發出一個match
事件。
最後,調用start()
方法開始執行算法:
start() {
this._combine(this.set, []);
this.emit('end');
}
複製代碼
經過調用_combine()
觸發算法,最後觸發一個end
事件,代表全部的組合都被檢查過,而且任何可能的匹配都已經被計算出來。 這是可能的,由於_combine()
是同步的; 所以,只要前面的函數返回,end
事件就會觸發,這意味着全部的組合都被計算出來了。
接下來,咱們在網絡上公開剛剛建立的算法。可使用一個簡單的HTTP
服務器對響應的任務做出響應。 特別是,咱們但願以/subsetSum?data=<Array>&sum=<Integer>
這樣的請求格式進行響應,傳入給定的數組和sum
,使用SubsetSum
算法進行匹配。
在一個名爲app.js
的模塊中實現這個簡單的服務器:
const http = require('http');
const SubsetSum = require('./subsetSum');
http.createServer((req, res) => {
const url = require('url').parse(req.url, true);
if(url.pathname === '/subsetSum') {
const data = JSON.parse(url.query.data);
res.writeHead(200);
const subsetSum = new SubsetSum(url.query.sum, data);
subsetSum.on('match', match => {
res.write('Match: ' + JSON.stringify(match) + '\n');
});
subsetSum.on('end', () => res.end());
subsetSum.start();
} else {
res.writeHead(200);
res.end('I\m alive!\n');
}
}).listen(8000, () => console.log('Started'));
複製代碼
因爲SubsetSum
實例使用事件返回結果,因此咱們能夠在算法生成後當即對匹配的結果使用Stream
進行處理。另外一個須要注意的細節是,每次咱們的服務器都會返回I'm alive!
,這樣咱們每次發送一個不一樣於/subsetSum
的請求的時候。能夠用來檢查咱們服務器是否掛掉了,這在稍後將會看到。
開始運行:
node app
複製代碼
一旦服務器啓動,咱們準備發送咱們的第一個請求;讓咱們嘗試發送一組17個隨機數,這將致使產生131,071
個組合,那麼服務器將會處理一段時間:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"
複製代碼
這是若是咱們在第一個請求仍在運行的時候在另外一個終端中嘗試輸入如下命令,咱們將發現一個巨大的問題:
curl -G http://localhost:8000
複製代碼
咱們會看到直到第一個請求結束以前,最後一個請求一直處於掛起的狀態。服務器沒有返回響應!這正如咱們所想的那樣。Node.js
事件循環運行在一個單獨的線程中,若是這個線程被一個長的同步計算阻塞,它將不能再執行一個循環來響應I'm alive!
, 咱們必須知道,這種代碼顯然不可以用於同時接收到多個請求的應用程序。
可是不要對Node.js
中絕望,咱們能夠經過幾種方式來解決這種狀況。咱們來分析一下最多見的兩種方案:
一般,CPU-bound
算法是創建在必定規則之上的。它能夠是一組遞歸調用,一個循環,或者基於這些的任何變化/組合。 因此,對於咱們的問題,一個簡單的解決方案就是在這些步驟完成後(或者在必定數量的步驟以後),將控制權交還給事件循環。這樣,任何待處理的I / O
仍然能夠在事件循環在長時間運行的算法產生CPU
的時間間隔中處理。對於這個問題而言,解決這一問題的方式是把算法的下一步在任何可能致使掛起的I/O
請求以後運行。這聽起來像是setImmediate()
方法的完美用例(咱們已經在Chapter2-Node.js Essential Patterns
中介紹過這一API
)。
模式:使用
setImmediate()
交錯執行長時間運行的同步任務。
如今咱們來看看這個模式如何應用於子集求和算法。 咱們所要作的只是稍微修改一下subsetSum.js
模塊。 爲方便起見,咱們將建立一個名爲subsetSumDefer.js
的新模塊,將原始的subsetSum
類的代碼做爲起點。 咱們要作的第一個改變是添加一個名爲_combineInterleaved()
的新方法,它是咱們正在實現的模式的核心:
_combineInterleaved(set, subset) {
this.runningCombine++;
setImmediate(() => {
this._combine(set, subset);
if(--this.runningCombine === 0) {
this.emit('end');
}
});
}
複製代碼
正如咱們所看到的,咱們所要作的只是使用setImmediate()
調用原始的同步的_combine()
方法。然而,如今的問題是由於該算法再也不是同步的,咱們更難以知道什麼時候已經完成了全部的組合的計算。
爲了解決這個問題,咱們必須使用很是相似於咱們在Chapter3-Asynchronous Control Flow Patterns with Callbacks
看到的異步並行執行的模式來追溯_combine()
方法的全部正在運行的實例。 當_combine()
方法的全部實例都已經完成運行時,觸發end
事件,通知任何監聽器,進程須要作的全部動做都已經完成。
對於最終子集求和算法的重構版本。首先,咱們須要將_combine()
方法中的遞歸步驟替換爲異步:
_combine(set, subset) {
for(let i = 0; i < set.length; i++) {
let newSubset = subset.concat(set[i]);
this._combineInterleaved(set.slice(i + 1), newSubset);
this._processSubset(newSubset);
}
}
複製代碼
經過上面的更改,咱們確保算法的每一個步驟都將使用setImmediate()
在事件循環中排隊,在事件循環隊列中I / O
請求以後執行,而不是同步運行形成阻塞。
另外一個小調整是對於start()
方法:
start() {
this.runningCombine = 0;
this._combineInterleaved(this.set, []);
}
複製代碼
在前面的代碼中,咱們將_combine()
方法的運行實例的數量初始化爲0
.咱們還經過調用_combineInterleaved()
來將調用替換爲_combine()
,並移除了end
的觸發,由於如今_combineInterleaved()
是異步處理的。 經過這個最後的改變,咱們的子集求和算法如今應該可以經過事件循環能夠運行的時間間隔交替地運行其可能大量佔用CPU
的代碼,而且不會再形成阻塞。
最後更新app.js
模塊,以便它可使用新版本的SubsetSum
:
const http = require('http');
// const SubsetSum = require('./subsetSum');
const SubsetSum = require('./subsetSumDefer');
http.createServer(function(req, res) {
// ...
})
複製代碼
和以前同樣的方式開始運行,結果以下:
此時,使用異步的方式運行,再也不會阻塞CPU
了。
正如咱們所看到的,在保持應用程序的響應性的同時運行一個CPU-bound
的任務並不複雜,只須要使用setImmediate()
把同步執行的代碼變爲異步執行便可。可是,這不是效率最好的模式;實際上,延遲執行一個任務會額外帶來一個小的開銷,在這樣的算法中,聚沙成塔,則會產生重大的影響。這一般是咱們在運行CPU
限制任務時所須要的最後一件事情,特別是若是咱們必須將結果直接返回給用戶,這應該在合理的時間內進行響應。 緩解這個問題的一個可能的解決方案是隻有在必定數量的步驟以後使用setImmediate()
,而不是在每一步中使用它。可是這仍然不能解決問題的根源。
記住,這並非說一旦咱們想要經過異步的模式來執行CPU-bound
的任務,咱們就應該不惜一切代價來避免這樣的額外開銷,事實上,從更廣闊的角度來看,同步任務並不必定很是漫長和複雜,以致於形成麻煩。在繁忙的服務器中,即便是阻塞事件循環200
毫秒的任務也會產生不但願的延遲。 在那些併發量並不高的服務器來講,即便產生必定短時的阻塞,也不會影響性能,使用交錯執行setImmediate()
多是避免阻塞事件循環的最簡單也是最有效的方法。
process.nextTick()
不能用於交錯長時間運行的任務。正如咱們在Chapter1-Welcome to the Node.js Platform
中看到的,nextTick()
會在任何未返回的I / O
以前調度,而且在重複調用process.nextTick()
最終會致使I / O
飢餓。 你能夠經過在前面的例子中用process.nextTick()
替換setImmediate()
來驗證。
使用interleaving模式
並非咱們用來運行CPU-bound
任務的惟一方法;防止事件循環阻塞的另外一種模式是使用子進程。咱們已經知道Node.js
在運行I / O
密集型應用程序(如Web服務器)的時候是最好的,由於Node.js
可使得咱們能夠經過異步來優化資源利用率。
因此,咱們必須保持應用程序響應的最好方法是不要在主應用程序的上下文中運行昂貴的CPU-bound
任務,而是使用單獨的進程。這有三個主要的優勢:
Node.js
中處理進程很簡單,可能比修改一個使用setImmediate()
的算法更容易,而且多進程容許咱們輕鬆使用多個處理器,而無需擴展主應用程序自己。C
。Node.js
有一個充足的API
庫帶來與外部進程交互。 咱們能夠在child_process
模塊中找到咱們須要的全部東西。 並且,當外部進程只是另外一個Node.js
程序時,將它鏈接到主應用程序是很是容易的,咱們甚至不以爲咱們在本地應用程序外部運行任何東西。這得益於child_process.fork()
函數,該函數建立一個新的子Node.js
進程,並自動建立一個通訊管道,使咱們可以使用與EventEmitter
很是類似的接口交換信息。來看如何用這個特性來重構咱們的子集求和算法。
重構SubsetSum
任務的目標是建立一個單獨的子進程,負責處理CPU-bound
的任務,使服務器的事件循環專一於處理來自網絡的請求:
processPool.js
的新模塊,它將容許咱們建立一個正在運行的進程池。建立一個新的進程代價昂貴,須要時間,所以咱們須要保持它們不斷運行,儘可能不要產生中斷,時刻準備好處理請求,使咱們能夠節省時間和CPU
。此外,進程池須要幫助咱們限制同時運行的進程數量,以免將使咱們的應用程序受到拒絕服務(DoS
)攻擊。subsetSumFork.js
的模塊,負責抽象子進程中運行的SubsetSum
任務。 它的角色將與子進程進行通訊,並將任務的結果展現爲來自當前應用程序。worker
(咱們的子進程),一個新的Node.js
程序,運行子集求和算法並將其結果轉發給父進程。DoS攻擊是企圖使其計劃用戶沒法使用機器或網絡資源,例如臨時或無限中斷或暫停鏈接到Internet的主機的服務。
先從構建processPool.js
模塊開始:
const fork = require('child_process').fork;
class ProcessPool {
constructor(file, poolMax) {
this.file = file;
this.poolMax = poolMax;
this.pool = [];
this.active = [];
this.waiting = [];
} //...
}
複製代碼
在模塊的第一部分,引入咱們將用來建立新進程的child_process.fork()
函數。 而後,咱們定義ProcessPool
的構造函數,該構造函數接受表示要運行的Node.js
程序的文件參數以及池中運行的最大實例數poolMax
做爲參數。而後咱們定義三個實例變量:
pool
表示的是準備運行的進程active
表示的是當前正在運行的進程列表waiting
包含全部這些請求的任務隊列,保存因爲缺乏可用的資源而沒法當即實現的任務看ProcessPool
類的acquire()
方法,它負責取出一個準備好被使用的進程:
acquire(callback) {
let worker;
if(this.pool.length > 0) { // [1]
worker = this.pool.pop();
this.active.push(worker);
return process.nextTick(callback.bind(null, null, worker));
}
if(this.active.length >= this.poolMax) { // [2]
return this.waiting.push(callback);
}
worker = fork(this.file); // [3]
this.active.push(worker);
process.nextTick(callback.bind(null, null, worker));
}
複製代碼
函數邏輯以下:
active
數組中,而後經過異步的方式調用其回調函數。waiting
數組。child_process.fork()
建立一個新的進程,將其添加到active
列表中,而後調用其回調。ProcessPool
類的最後一個方法是release()
,其目的是將一個進程放回進程池中:
release(worker) {
if(this.waiting.length > 0) { // [1]
const waitingCallback = this.waiting.shift();
waitingCallback(null, worker);
}
this.active = this.active.filter(w => worker !== w); // [2]
this.pool.push(worker);
}
複製代碼
前面的代碼也很簡單,其解釋以下:
waiting
任務隊列裏面有任務須要被執行,咱們只需爲這個任務分配一個進程worker
執行。waiting
任務隊列中都沒有須要被執行的任務,咱們則把active
的進程列表中的進程放回進程池中。正如咱們所看到的,進程歷來沒有中斷,只在爲其不斷地從新分配任務,使咱們能夠經過在每一個請求不從新啓動一個進程達到節省時間和空間的目的。然而,重要的是要注意,這可能並不老是最好的選擇,這很大程度上取決於咱們的應用程序的要求。爲減小進程池長期佔用內存,可能的調整以下:
如今咱們的ProcessPool
類已經準備就緒,咱們可使用它來實現SubsetSumFork
模塊,SubsetSumFork
的做用是與子進程進行通訊獲得子集求和的結果。前面曾說到,用child_process.fork()
啓動一個進程也給了咱們建立了一個簡單的基於消息的管道,經過實現subsetSumFork.js
模塊來看看它是如何工做的:
const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);
class SubsetSumFork extends EventEmitter {
constructor(sum, set) {
super();
this.sum = sum;
this.set = set;
}
start() {
workers.acquire((err, worker) => { // [1]
worker.send({sum: this.sum, set: this.set});
const onMessage = msg => {
if (msg.event === 'end') { // [3]
worker.removeListener('message', onMessage);
workers.release(worker);
}
this.emit(msg.event, msg.data); // [4]
};
worker.on('message', onMessage); // [2]
});
}
}
module.exports = SubsetSumFork;
複製代碼
首先注意,咱們在subsetSumWorker.js
調用ProcessPool
的構造函數建立ProcessPool
實例。 咱們還將進程池的最大容量設置爲2
。
另外,咱們試圖維持原來的SubsetSum
類相同的公共API。實際上,SubsetSumFork
是EventEmitter
的子類,它的構造函數接受sum
和set
,而start()
方法則觸發算法的執行,而這個SubsetSumFork
實例運行在一個單獨的進程上。調用start()
方法時會發生的狀況:
sum
和set
。 send()
方法是Node.js
自動提供給child_process.fork()
建立的全部進程,這實際上與父子進程之間的通訊管道有關。on()
方法附加一個新的事件監聽器(這也是全部以child_process.fork()
建立的進程提供的通訊通道的一部分)。end
事件,這意味着SubsetSum
全部任務已經完成,在這種狀況下,咱們刪除onMessage
監聽器並釋放worker
,並將其放回進程池中,再也不讓其佔用內存資源和CPU
資源。worker
以{event,data}
格式生成消息,使得任什麼時候候一旦子進程處理完畢任務,咱們在外部都能接收到這一消息。這就是SubsetSumFork
模塊如今咱們來實現這個worker
應用程序。
如今咱們來建立subsetSumWorker.js
模塊,咱們的應用程序,這個模塊的所有內容將在一個單獨的進程中運行:
const SubsetSum = require('./subsetSum');
process.on('message', msg => { // [1]
const subsetSum = new SubsetSum(msg.sum, msg.set);
subsetSum.on('match', data => { // [2]
process.send({event: 'match', data: data});
});
subsetSum.on('end', data => {
process.send({event: 'end', data: data});
});
subsetSum.start();
});
複製代碼
因爲咱們的handler
處於一個單獨的進程中,咱們沒必要擔憂這類CPU-bound
任務阻塞事件循環,全部的HTTP
請求將繼續由主應用程序的事件循環處理,而不會中斷。
當子進程開始啓動時,父進程:
process.on()
函數輕鬆實現。咱們指望從父進程中惟一的消息是爲新的SubsetSum
任務提供輸入的消息。只要收到這樣的消息,咱們建立一個SubsetSum
類的新實例,並註冊match
和end
事件監聽器。最後,咱們用subsetSum.start()
開始計算。{event,data}
的對象中,並將其發送給父進程。這些消息而後在subsetSumFork.js
模塊中處理,就像咱們在前面的章節中看到的那樣。注意:當子進程不是
Node.js
進程時,則上述的通訊管道就不可用了。在這種狀況下,咱們仍然能夠經過在暴露於父進程的標準輸入流和標準輸出流之上實現咱們本身的協議來創建父子進程通訊的接口。
嘗試新版本的子集求和算法,咱們只須要替換HTTP
服務器使用的模塊(文件app.js
):
運行結果以下:
更有趣的是,咱們也能夠嘗試同時啓動兩個subsetSum
任務,咱們能夠充分看到多核CPU
的做用。 相反,若是咱們嘗試同時運行三個subsetSum
任務,結果應該是最後一個啓動將掛起。這不是由於主進程的事件循環被阻塞,而是由於咱們爲subsetSum
任務設置了兩個進程的併發限制。
正如咱們所看到的,多進程模式比interleaving模式更增強大和靈活;然而,因爲單個機器提供的CPU
和內存資源量仍然是一個硬性限制,因此它仍然不可擴展。在這種狀況下,將負載分配到多臺機器上,則是更優秀的解決辦法。
值得一提的是,在運行
CPU-bound
任務時,多線程能夠成爲多進程的替代方案。目前,有幾個npm
包公開了一個用於處理用戶級模塊的線程的API
;其中最流行的是webworker-threads。可是,即便線程更輕量級,完整的進程也能夠提供更大的靈活性,並具有更高更可靠的容錯處理。
本章講述如下三點:
Node.js
異步中的運用CPU-bound
的任務