本文轉載自:衆成翻譯
譯者:文藺
連接:http://www.zcfy.cc/article/662
原文:http://blog.yld.io/2016/05/10/introducing-queues/javascript
這是深刻探索 Node.js 中使用工做隊列(work queues)管理異步工做流的系列文章的第一篇,來自the Node Patterns series。java
開始享受吧!node
很常見的是,在應用程序流中,應用有着能夠異步處理的工做負載。一個常見的例子是發送郵件。比方說,新用戶註冊時,可能須要給 Ta 發送一封確認郵件來確認用戶剛剛輸入的 email 地址是 Ta 本身的。這包括從模板中生成消息,向電子郵件服務提供商發送請求,解析結果,處理任何可能發送的最終錯誤,重試,等等…… 這個流程可能比較複雜,容易出錯,或者在 HTTP 服務器的週期中花費太長時間。不過也有另一個選擇,能夠向持久化存儲中插入一個文檔,該文檔描述咱們有一條待發送給這個用戶的消息。另外一個進程可能拿到這個文檔,作一些比較重的工做:從模板生成消息,向服務器發送請求,解析錯誤,並在必要的狀況下重排這個工做。數據庫
此外,系統須要和其餘系統整合的狀況也很常見。在我曾作過的一些項目中,須要不一樣的系統之間的進行用戶配置文件的雙向同步:當用戶在一個系統中更新了我的資料,這些變化須要傳遞給其餘系統,反之亦然。若是兩個系統之間不須要很強的一致性,資料同步之間有一個小的延遲也許是可接受的,那這項工做就可使用另外一個進程異步處理。bash
更通常地說,在整個系統中有一個工做隊列將工做生產者和消費者分開,這是一種常見的模式。生產者往工做隊列中插入工做,消費者從隊列中拿到工做並執行須要的任務。服務器
使用這樣的拓撲結構有許多緣由和優勢,如:併發
解耦工做生產者和消費者dom
使重試邏輯更易於實現異步
跨時間分配工做負載async
跨空間(nodes 節點)分配工做負載
異步工做
使外部系統更容易整合(最終的一致性)
讓咱們來分析一下其中的一些問題吧。
發送郵件是許多應用須要作的工做。一個例子是,用戶修改了密碼,一些應用很友好地發送郵件通知用戶有人(最好不是其餘人)修改了密碼。如今發送郵件,一般是經過調用第三方郵件提供商提供的 HTTP API來完成的。若是服務緩慢或沒法訪問時候會怎樣?你可不想就由於一封郵件發佈出去就把密碼給回滾了。固然,你也不想就由於在處理請求失敗時碰到了工做中的一個非重要的部分,使得密碼更改請求就這樣崩掉了。密碼修改後但願能夠很快就發送出這封郵件,但不能有如此的代價。
還有,修改密碼意味着,你要爲這個用戶在兩個系統中都作更改:一箇中央用戶數據庫和一個遺留系統(legacy system)。(我知道這很噁心啊,不過我可不止見過一次 —— 現實就這麼骨感。)假如第一個成功了、第二個失敗了,咋辦?
在這些情形下,你能夠想一直重試直至成功:在遺留系統中更改密碼是一個能夠屢次重複的結果相同的操做,而郵件也能夠重複發送屢次。
舉例子,假如遺留系統修改密碼了但未能成功返回通知,若是操做是冪等的,你能夠稍後重試。
甚至,非冪等操做也能夠從工做隊列處理中嚐到甜頭。好比,你能夠將一次貨幣交易插入到工做隊列中 :給每次交易一個通用惟一標識符(UUID, universal unique identifier),稍後接收交易請求的系統能夠保證不會發生重複交易。
在這個例子中,你基本只須要擔憂工做隊列提供的必要的持久性保證:若是系統故障,你但願將交易丟失的風險降到最低。
另外一個將工做生產者和消費者解耦的緣由是,你可能想將工做集羣規模化:若是任務消耗大量資源,若是任務是重 CPU 型的或者須要大量內存或操做系統資源,你能夠將其與應用其餘部分分離出來,放到工做隊列中。
在任何應用中,一些操做比其餘的要重。這可能會在整個節點引入有差別的工做負載:一個不幸的節點可能因處理太多的高併發業務而負擔太重,而其它節點卻被閒置。使用工做隊列,將具體工做平均分配,能夠將影響最小化。
工做隊列的另外一個效果是吸取工做峯(absorb work peaks):你能夠爲工做集羣計劃給定的最大容量,並確保容量永遠不會超過。若是工做數量在短期內急劇上升,工做隊列徹底能夠解決,遠離工做峯的壓力。
系統監控在這裏起到重要做用:你應當持續監控工做隊列的長度,工做時間(完成一項任務的時間),工做佔用,以及容量,以肯定在高峯時間保證使人滿意的操做時間須要的最佳、最小的資源。
若是你不須要以上任何一點東西,使用持久化工做隊列的一個理由是防止崩潰。即便是同一個進程中的內存隊列也能知足你的應用需求,持續的隊列使你的應用在進程重啓的時候更具彈性。
好了,理論講得差很少了 —— 咱們來看具體實現。
能夠設計出的最簡單的工做隊列是一個內存隊列。實現內存隊列多是個學校的練習(留給讀者)。這裏咱們使用 Async 的 queue。
假設你在作的這個演示應用和一個控制你的房子的硬件單元相鏈接。你的 Node.js 應用和該單元經過一個串行端口對話,且有線協議只能同時接受一個掛起的命令。
這個協議被包裝在咱們的 domotic.js
模塊中,模塊暴露三個函數:
.connect()
- 鏈接 domotic 模塊
.command()
- 發送命令,等待響應
.disconnect()
- 切斷與模塊的鏈接
下面的代碼模擬了這樣一個模塊:
domotic.js:
exports.connect = connect; exports.command = command; exports.disconnect = disconnect; function connect(cb) { setTimeout(cb, 100); // simulate connection } function command(cmd, options, cb) { if (succeeds()) { setTimeout(cb, 100); // simulate command } else { setTimeout(function() { var err = Error('error connecting'); err.code = 'ECONN'; cb(err); }, 100); } } function disconnect(cb) { if (cb) setTimeout(cb, 100); // simulate disconnection } function succeeds() { return Math.random() > 0.5; }
注意咱們並無和任何 domotic 模塊交互;咱們只是僞裝,100 毫秒後成功調用回調函數。
一樣,
.command
函數模擬了鏈接錯誤: 若是succeeds()
返回false
,鏈接失敗,命令失敗,這有 50% 的可能性(咱們的 domotic 串行鏈接很容易出錯)。這使咱們可以測試在發生失敗以後,咱們的應用是否會成功重連並重試命令。
而後咱們新建另外一個模塊,能夠在隊列後面發出命令。
domotic_queue.js:
var async = require('async'); var Backoff = require('backoff'); var domotic = require('./domotic'); var connected = false; var queue = async.queue(work, 1); function work(item, cb) { ensureConnected(function() { domotic.command(item.command, item.options, callback); }); function callback(err) { if (err && err.code == 'ECONN') { connected = false; work(item); } else cb(err); } } /// command exports.command = pushCommand; function pushCommand(command, options, cb) { var work = { command: command, options: options }; console.log('pushing command', work); queue.push(work, cb); } function ensureConnected(cb) { if (connected) { return cb(); } else { var backoff = Backoff.fibonacci(); backoff.on('backoff', connect); backoff.backoff(); } function connect() { domotic.connect(connected); } function connected(err) { if (err) { backoff.backoff(); } else { connected = true; cb(); } } } /// disconnect exports.disconnect = disconnect; function disconnect() { if (! queue.length()) { domotic.disconnect(); } else { console.log('waiting for the queue to drain before disonnecting'); queue.drain = function() { console.log('disconnecting'); domotic.disconnect(); }; } }
作了很多工做 —— 咱們來一段段地分析。
var async = require('async'); var Backoff = require('backoff'); var domotic = require('./domotic');
這裏咱們引入了一些包:
async
- 提供內存隊列的實現
backoff
- 讓咱們增長每一次失敗後嘗試從新鏈接的時間間隔
./domotic
- 模擬 domotic 的模塊
咱們的模塊從鏈接斷開狀態開始啓動:
`var connected = false;`
創建咱們的 async 隊列:
`var queue = async.queue(work, 1);`
這裏提供一個叫作 worker
的工做函數(在代碼中進一步定義的)和一個最大併發量 1。咱們在這裏強制設置,是由於咱們定義了 domotic 模塊協議一次只容許一個命令。
而後定義 worker
函數,它每次處理一個隊列元素:
function work(item, cb) { ensureConnected(function() { domotic.command(item.command, item.options, callback); }); function callback(err) { if (err && err.code == 'ECONN') { connected = false; work(item); } else cb(err); } }
當咱們的 async
隊列加入另外一個工做項目,會調用 work
函數,傳遞該工做項目和一個當工做完成時候爲咱們所調用的回調函數。
對每一個工做項目來講,咱們要確認已經鏈接了。一旦鏈接上,使用工做項目中會有的 command
和 options
屬性,來用 domotic 模塊來執行命令。傳的最後一次參數是一個回調函數,當命令成功或失敗以後會當即被調用。
回調函數中,咱們明確地處理鏈接錯誤的狀況,設置 connected
狀態爲 false
,並再次調用 work
重連。
若是沒有發生錯誤,調用回調函數 cb
結束當前工做項目。
function ensureConnected(cb) { if (connected) { return cb(); } else { var backoff = Backoff.fibonacci(); backoff.on('backoff', connect); backoff.backoff(); } function connect() { domotic.connect(connected); } function connected(err) { if (err) { backoff.backoff(); } else { connected = true; cb(); } } }
ensureConnected
函數如今負責處於鏈接狀態時調用回調或相反狀況下嘗試鏈接。嘗試鏈接的時候,使用 backoff
增長每次重連的時間間隔。 每次 domotic.connect
函數帶着錯誤被調用,在 backoff
事件觸發以前增長間隔時間。觸發 backoff
時,嘗試鏈接。一旦鏈接成功,調用 cb
回調;不然保持重試。
這個模塊暴露一個 .command
函數:
/// command exports.command = pushCommand; function pushCommand(command, options, cb) { var work = { command: command, options: options }; console.log('pushing command', work); queue.push(work, cb); }
這個命令簡單的解析一個工做項目並將其推入隊列。
最後,這個模塊一樣暴露出 .disconnect
函數。
/// disconnect exports.disconnect = disconnect; function disconnect() { if (! queue.length()) { domotic.disconnect(); } else { console.log('waiting for the queue to drain before disonnecting'); queue.drain = function() { console.log('disconnecting'); domotic.disconnect(); }; } }
這裏咱們只是確保在調用 domotic 模塊的 disconnected
方法以前隊列是空的。若是隊列非空,在真正斷開鏈接以前會等待其耗盡(drain)。
可選:在隊列未被耗盡的狀況下,您能夠設置一個超時時間,而後強制斷開鏈接。
而後咱們來新建一個 domotic 客戶端:
client.js:
var domotic = require('./domotic_queue'); for(var i = 0 ; i < 20; i ++) { domotic.command('toggle light', i, function(err) { if (err) throw err; console.log('command finished'); }); } domotic.disconnect();
這裏咱們並行得向 domotic 模塊添加了 20 個 settime
命令,同時傳遞了回調函數,當命令完成時就會被調用。若是有命令出錯,簡單地拋出錯誤並中斷執行。
添加全部命令以後咱們立刻斷開鏈接,不過模塊會等待全部命令被執行以後纔會真正將其斷開。
讓咱們在命令行中試一下:
$ node client.js pushing command { command: 'toggle light', options: 0 } pushing command { command: 'toggle light', options: 1 } pushing command { command: 'toggle light', options: 2 } pushing command { command: 'toggle light', options: 3 } pushing command { command: 'toggle light', options: 4 } pushing command { command: 'toggle light', options: 5 } pushing command { command: 'toggle light', options: 6 } pushing command { command: 'toggle light', options: 7 } pushing command { command: 'toggle light', options: 8 } pushing command { command: 'toggle light', options: 9 } pushing command { command: 'toggle light', options: 10 } pushing command { command: 'toggle light', options: 11 } pushing command { command: 'toggle light', options: 12 } pushing command { command: 'toggle light', options: 13 } pushing command { command: 'toggle light', options: 14 } pushing command { command: 'toggle light', options: 15 } pushing command { command: 'toggle light', options: 16 } pushing command { command: 'toggle light', options: 17 } pushing command { command: 'toggle light', options: 18 } pushing command { command: 'toggle light', options: 19 } waiting for the queue to drain before disonnecting command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished command finished disconnecting
這裏咱們能夠看到,全部命令被當即放到隊列中,而且命令是被一些隨機時間間隔着有序完成的。最後,全部命令完成以後鏈接切斷。
本系列的下一篇文章,咱們將探索如何避免崩潰以及經過持久化工做項目來限制內存影響。