原文連接:https://blog.breezelin.cn/scheme-redis-task-queue.htmlphp
一個網關服務器就跟快餐店同樣,老是但願客人來得快、去得也快,這樣在相同時間內才能夠服務更多的客人。若是快餐店的服務員在一個顧客點餐、等餐和結帳時都全程跟陪的話,那麼這個服務員大部分時間都是在空閒的等待。應該有專門的服務員負責點餐,專門的服務員負責送餐,專門的服務員負責結帳,這樣才能提升效率。一樣道理,網關服務器中也須要分工明確。舉個例子:
假設有一個申請發送重置密碼郵件的網關接口,須知道發送一封郵件可能會花費上好幾秒鐘,若是網關服務器直接在線上給用戶發送重置密碼郵件,高併發的狀況下就很容易形成網絡擁擠。但實際上,網關服務器並不是必定要等待郵件發送成功後才能響應用戶,徹底能夠先告知用戶郵件會發送的,然後再在線下把郵件發送出去(就像快餐店裏點餐的服務員跟顧客說先去找位置坐,飯菜作好後會有人給他送過去)。html
那麼是誰來把郵件發送出去呢?laravel
爲了網關接口可以儘快響應用戶請求,無需即時知道結果的耗時操做能夠交由任務隊列機制來處理。
任務隊列機制中包含兩種角色,一個是任務生產者,一個是任務消費者,而任務隊列是二者之間的紐帶:redis
任務隊列的總體運行流程是:任務生產者把當前操做的關鍵信息(後續能夠根據這些信息還原出當前操做)抽象出來,好比發送重置密碼的郵件,咱們只須要當前用戶郵箱和用戶名就能夠了;任務生產者把任務放進隊列,實際就是把任務的關鍵信息存儲起來,這裏會用到MySQL、Redis之類數據存儲工具,經常使用的是Redis;而任務消費者就不斷地從數據庫中取出任務信息,逐一執行。數據庫
任務生產者的工做是任務分發,通常由線上的網關服務程序執行;任務消費者的工做是任務調度,通常由線下的程序執行,這樣即便任務耗時再多,也不阻塞網關服務。服務器
這裏主要討論的是任務調度(任務消費者)的程序設計。網絡
<!--more-->併發
假設咱們用Redis列表List存儲任務信息,列表鍵名是queues:default
,任務發佈就是往列表queues:default
後追加數據:函數
<?php // PHP僞代碼 Redis::rpush('queues:default', serialize($task));
那麼任務調度能夠這樣簡單直接的實現:高併發
<?php // PHP僞代碼 Class Worker { public function schedule() { while(1) { $seri = Redis::lpop('queues:default'); if($seri) { $task = unserialize($seri); $this->handle($task); continue; } sleep(1); } } public function handle($task) { // do something time-consuming } } $worker = new Worker; $worker->schedule();
上面代碼是直接從queues:default
列表中移出第一個任務(lpop),由於handle($task)
函數是一個耗時的操做,過程當中如果遇到什麼意外致使了整個程序退出,這個任務可能還沒執行完成,但是任務信息已經徹底丟失了。保險起見,對schedule()
函數進行如下修改:
<?php ... public function schedule() { while(1) { $seri = Redis::lindex('queues:default', 0); if($seri) { $task = unserialize($seri); $this->handle($task); Redis::lpop('queues:default'); continue; } sleep(1); } } ...
即在任務完成後纔將任務信息從列表中移除。
queues:default
列表中的任務都是須要即時執行的,可是有些任務是須要間隔一段時間後或者在某個時間點上執行,那麼能夠引入一個有序集合,命名爲queues:default:delayed
,來存放這些任務。任務發佈時須要指明執行的時間點$time
:
<?php // PHP僞代碼 Redis::zadd('queues:default:delayed', $time, serialize($task));
任務調度時,若是queues:default
列表已經空了,就從queues:default:delayed
集合中取出到達執行時間的任務放入queues:default
列表中:
<?php ... public function schedule() { while(1) { $seri = Redis::lindex('queues:default', 0); if($seri) { $task = unserialize($seri); $this->handle($task); Redis::lpop('queues:default'); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time()); if($seri_arr) { Redis::rpush('queues:default', $seri_arr); continue; } sleep(1); } } ...
預估任務正常執行所需的最大時間值,如果任務執行超過了這個時間,多是過程當中遇到一些意外,若是任由它繼續卡着,那麼後面的任務就會沒法被執行了。
首先咱們給任務設定一個時限屬性timeout
,而後在執行任務前先給進程自己設置一個鬧鐘信號,timeout
後收到信號說明任務執行超時,須要退出當前進程(用supervisor守護進程時,進程自身退出,supervisor會自動再拉起)。
注意:pcntl_alarm($timeout)
會覆蓋以前鬧鐘信號,而pcntl_alarm(0)
會取消鬧鐘信號;任務超時後,當前任務放入queues:default:delayed
集合中延時執行,以避免再次阻塞隊列。
<?php ... public function schedule() { while(1) { $seri = Redis::lindex('queues:default', 0); if($seri) { $task = unserialize($seri); $this->timeoutHanle($task); $this->handle($task); Redis::lpop('queues:default'); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time()); if($seri_arr) { Redis::rpush('queues:default', $seri_arr); continue; } pcntl_alarm(0); sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () { $seri = Redis::lpop('queues:default'); Redis::zadd('queues:default:delayed', time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...
上面代碼,直觀上沒什麼問題,可是在多進程併發執行的時候,有些任務可能會被重複執行,是由於沒能及時將當前執行的任務從queues:default
列表中移出,其餘進程也能夠讀取到。爲了不重複執行的問題,咱們須要引入一個有序集合SortedSet存放正在執行的任務,命名爲queues:default:reserved
。
首先任務是從queues:default
列表中直接移出,而後開始執行任務前先把任務放進queues:default:reserved
集合中,任務完成了再從queues:default:reserved
集合中移出。
再結合任務超時,假設一個任務執行時間不可能超過60*60
秒(能夠按需調整),在queues:default
列表爲空的時候,queues:default:reserved
集合中有任務已經存放超過了60*60
秒,那麼有多是某些進程在執行任務是意外退出了,因此把這些任務放到queues:default:delayed
集合中稍後執行。
<?php ... public function schedule() { while(1) { $seri = Redis::lpop('queues:default', 0); if($seri) { Redis::zadd('queues:default:reserved', time()+10, $seri); $task = unserialize($seri); $this->timeoutHanle($task); $this->handle($task); Redis::zrem('queues:default:reserved', $seri); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time()); if($seri_arr) { Redis::rpush('queues:default', $seri_arr); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:reserved', 0, time()-60*60); if($seri_arr) { foreach($seri_arr as $seri) { Redis::zadd('queues:default:delayed', time()+10, $seri); } } sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () use ($task) { $seri = serialize($task); Redis::zrem('queues:default:reserved', $seri); Redis::zadd('queues:default:delayed', time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...
以上代碼沒有檢驗任務是否執行成功,應該有任務失敗的處理機制:好比給任務設定一個最多重試次數屬性retry_times
,任務每執行一次retry_times
,任務執行失敗時,如果retry_times
等於0,則將任務放入queues:default:failed
列表中不在執行;不然放入放到queues:default:delayed
集合中稍後執行。
以上代碼是進程忙時連續執行,閒時休眠一秒,能夠按需調整優化。
如果須要在任務執行成功或失敗時進行某些操做,能夠給任務設定成功操做方法afterSucceeded()
或失敗操做方法afterFailed()
,在相應的時候回調。
以上講述了一個任務調度程序的逐步演變,設計方案很大程度上參考了Laravel Queue。用工具,知其然,知其因此然。