爲何要有消息隊?這裏先對其進行一個簡單的介紹,方便還不瞭解的同窗理解.在面向對象裏,有一個很簡單的概念--消息傳遞,而消息隊列就能夠在它上面擴展一下,把它說的更通俗些:從執行的角度去看,消息隊列把原
來可直接調用的一個函數(一段程序或一個對象)放到另外一個進程中了,因此它們之間的消息傳遞就從直接傳遞參數變成了以隊列爲載體來傳遞所需參數的一種方式.更加詳細的介紹能夠參考這篇文章php
衆所周知,laravel是個優雅的框架,它的隊列處理也不例外,能夠先看看手冊
Laravel 5.2 文檔 服務 —— 隊列(這篇文章是英文版手冊翻譯,內容詳盡).在基本瞭解它的使用以後,咱們就能夠來分析分析相關源代碼,學習它的原理了(該篇文章只探討基於Redis的隊列服務).html
laravel把隊列相關的服務全封裝在一個Service裏面,經過Queue Service Provider 註冊到IOC中.在Queue Serivce裏提供了多種服務,關於它作了什麼請看如下代碼中的註釋(代碼有點多,就不所有貼出來了)laravel
public function register() { // 註冊Manager, 而Manager爲隊列服務的統一入口 $this->registerManager(); // 註冊隊列的各類命令 $this->registerWorker(); $this->registerListener(); // 註冊任務執行失敗後的記錄 $this->registerFailedJobServices(); // 未知 $this->registerQueueClosure(); }
講配置文件redis
在Queue Service注入到IOC後,咱們就可使用隊列了。
一個隊列服務最基本的就是把任務寫入隊列,再將其拿出來執行, 很簡單.因此隊列服務最基本的要素有四點:任務,進隊列,出隊列,執行.在這裏,咱們就跟着這四個要素的步伐,看看它們在laravel中是如何實現的.這裏,先以官方的示例來分析,有了一個具體概念以後再觸類旁通,學會它的本質。數據庫
隊列服務就是圍繞着任務進行的.在手冊上,經過它的實例SendReminderEmail
,咱們能夠很清楚地知道,laravel能夠對一個任務作不少事,好比:可設置從新執行的次數,說明該任務(若失敗)能夠被執屢次(針對的是單個Job);可設置是否能夠延遲執行;對該Job設置處理的隊列名稱,等等.這些功能都是\Illuminate\Bus\Queueable
提供的,固然,實例中還有一個\Illuminate\Queue\InteractsWithQueue
,而它則是針對Job所用(稍後再說).一個任務創建完成後,就須要使其進入隊列了。固然了,除了以上幾個特色,還有任務的執行邏輯等等,要全面地瞭解任務,就須要清楚它的數據結構,其在隊列中的數據結構會在進入隊列中講到.json
示例中,在定義了任務以後,就將其用Controller中的方法使其進入了隊列,那麼這一點是如何實現的?數據結構
代碼在:\Illuminate\Foundation\Bus\DispatchesJobs
app
protected function dispatch($job) { return app(Dispatcher::class)->dispatch($job); } /** * Dispatch a command to its appropriate handler in the current process. * * @param mixed $job * @return mixed */ public function dispatchNow($job) { return app(Dispatcher::class)->dispatchNow($job); }
這段代碼的意圖得了解app(Dispather::class)
, 這個則在\Illuminate\Bus\BusServiceProvider
中表現的很明確了(爲何是這裏就不分析了),app(Dispather::class)
就是\Illuminate\Bus\Dispatcher
.如今,上面代碼中的dispatch
與 dispatchNow
方法就會逐漸清晰起來.簡言之,該Dispatcher
類作了兩件事,執行該任務或把該任務放入隊列,也就是將隊列任務分爲了兩種執行方式,當即執行或以消息隊列執行,與隊列相關的代碼以下:框架
/** * 把任務分發到隊列中 * @param string $command 任務類 */ public function dispatchToQueue($command) { $connection = isset($command->connection) ? $command->connection : null; // laravel裏內置了多種隊列服務,這裏則解析出來 $queue = call_user_func($this->queueResolver, $connection); // 隊列服務解析不成功則拋出異常 if (! $queue instanceof Queue) { throw new RuntimeException('Queue resolver did not return a Queue implementation.'); } // 在任務類中可自定義queue方法進入隊列 if (method_exists($command, 'queue')) { return $command->queue($queue, $command); } else { // 系統提供的一種進入隊列方式 return $this->pushCommandToQueue($queue, $command); } } /** * 根據不一樣的任務屬性選擇不一樣的進入隊列方式 * 這裏所提到的方式在手冊中有提到 * @param Queue $queue 隊列服務 * @param $command 任務類 */ protected function pushCommandToQueue($queue, $command) { // 該推任務設置了延遲,且設置隊列名稱 if (isset($command->queue, $command->delay)) { return $queue->laterOn($command->queue, $command->delay, $command); } //設置隊列名稱 if (isset($command->queue)) { return $queue->pushOn($command->queue, $command); } //設置延遲 if (isset($command->delay)) { return $queue->later($command->delay, $command); } // default return $queue->push($command); }
到如今爲止,Controller已經展現了一種進入隊列的方法,很明顯它是通過封裝提供的接口,雖然很好用,但有些操做是咱們所沒必要須的,好比:是否當即執行,進入隊列就需設置不一樣的任務參數等等,須要更好的爲咱們所用,就再深刻一點,找出它進入隊列的關鍵點(與Redis交互的地方).上面已經提到call_user_func($this->queueResolver, $connection);
會獲得一個隊列服務,那麼$this->queueResolver
是什麼?在\Illuminate\Bus\BusServiceProvider:23
就能夠看到:ide
// 理解這個回調,則須要瞭解Illuminate\Contracts\Queue\Factory // 在vendor/laravel/framework/src/Illuminate/Foundation/Application.php:1051 能夠看到它與Illuminate\Queue\QueueManager的關係了 $this->app->singleton('Illuminate\Bus\Dispatcher', function ($app) { return new Dispatcher($app, function ($connection = null) use ($app) { return $app['Illuminate\Contracts\Queue\Factory']->connection($connection); }); });
在laravel中,Service對外的統一接口都是其Manager,其中與所需服務交互的基本上是經過__call
方法提供,這種方式有兩個優勢,一,提供統一的接口,二,分層明確(將實際的處理由__call
轉發,與配置相關的則由manager本身解決).
如今爲了使任務進入隊列的過程更清晰,一步一步找到了QueueManager
,這個類設置了不少事件接口,和其餘鏈接相關方法.其中connection
方法就展現了一個隊列服務是如何解析出來的了.
其實這段解析的代碼惟一的難點中於:
protected function getConnector($driver) { if (isset($this->connectors[$driver])) { return call_user_func($this->connectors[$driver]); } throw new InvalidArgumentException("No connector for [$driver]"); }
爲何這麼一段簡單的代碼就能解析隊列服務?查看QueueServiceProvider就一目瞭然了.其中就註冊了不少隊列服務.redis
的隊列服務處理則是\Illuminate\Queue\RedisQueue
.
QueueManager
的功能如今很清晰了.1,解析隊列服務 2,轉發(__call
)處理到相應的隊列服務中 3,提供隊列相關接口 .既然QueueManager
有這麼多隊列相關的功能,那麼咱們徹底能夠把它做爲一個隊列處理的入口(直接獲取隊列服務再進行操做是並非明智的選擇),巧的是laravel
也是這麼作的.因此如今有兩種方式進入隊列,1,使用\Illuminate\Foundation\Bus\DispatchesJobs
間接與隊列服務通訊 2,使用QueueManager
間接與隊列服務通訊.固然這些方法都是在\Illuminate\Queue\RedisQueue
(隊列服務的接口)上擴展的.因此掌握該類,就能明白隊列的各類行爲了.
隊列服務在lavavel
中提供了多種,這裏只對以Redis
隊列服務進行分析學習.因此有關隊列的處理都集中在\Illuminate\Queue\RedisQueue
.上面也說到了,有兩種方式進入隊列, 分別使用,看它們產生的任務數據結構有什麼區別?(數據結構便於分析,在後面會提到)
在Controller使用 $this->dispatch((new SendReminderEmail()))
;即以任務類進入
{ "job": "Illuminate\\Queue\\CallQueuedHandler@call", "data": { "command": "O:26:\"App\\Jobs\\SendReminderEmail\":4:{s:5:\"queue\";s:5:\"email\";s:10:\"connection\";N;s:5:\"delay \";N;s:6:\"\u0000*\u0000job\";N;}" }, "id": "7u00jImd8CAns0fQO8jedqkQmnbQsfsr", "attempts": 1 }
直接使用 Queue::push(SendReminderEmail::class , ['email'=>'123456789@qq.com'],'email')
;
{ "job": "App\\Jobs\\SendReminderEmail", "data": { "email": "123456789@qq.com" }, "id": "I0OeBIQjJjisQrZ7STX3zexrBLF7Uilx", "attempts": 1 }
上面講到,構成消息隊列須要兩個進程,因此上面的進入隊列是一個進程,如今的出隊列及執行任務則在另外一個進程中執行。lavarel提供了兩個命令來啓動該進程,quque:work ,queue:litsen 固然,再理解了如何完成這些操做後徹底能夠本身寫一個命令,如今看看它是如何出隊列和如何執行任務?
在手冊中,對於一個任務能夠指定多種屬性,好比,延遲,失敗次數,隊列名稱等等,固然,全部可執行操做或功能都得依賴數據結構,數據結構的制定也是爲了實現相應的行爲.因此,RedisQueue的代碼對應上面的數據結構來理解就比較容易了。
RedisQueue是全部隊列服務(Redis)的基礎接口,因此任務出隊列的操做也能在這找到。假設如今已經對RedisQueue的代碼已經有點熟悉了,不難發現,有一個稍複雜的pop方法(出隊列)。那麼,問題出現了,出隊列是如何實現的?解決了這個問題,任務出隊列就可算是完成了.
查看php artisan queue:work --help
命令的使用方法,整理有關隊列所需的功能或服務:
固然還有其餘關於該命令的功能,好比:是否以守護進程執行,是否強制執行,限制進程執行的memory,無任務時的等待時間.這些與命令相關的因不一樣的命令而異,與隊列任務無關.這樣,在理清隊列任務須要的功能後,咱們就能夠分析它的數據結構,理解代碼了.
數據結構都是依據行爲而創建.因此在查看pop方法時,可考慮以上幾個點.上面的數據結構中,已經能夠看到隊列的執行邏輯,所需參數,失敗次數,這些一目瞭然,就不囉嗦了.在整個pop方法中,有這麼幾個隊列,queue:delayed,queue:reserved,queue.原本取出一個任務用lpop就可完成,爲何要多用兩個集合(注意,是有序集合不是隊列)來完成pop操做呢?由於要實現任務延遲和失敗處理.
其執行過程如圖:
過程解析:
(1). 取任務,由於要實現延遲的功能,因此在有序集合裏的score是過時時間,過時時間的含義則是在此時間以前不執行,也就達到了延遲執行的效果.延遲的含義在這裏指的並非在多少秒後執行,而是在多少秒內不執行.對於過時的任務,就將其rpush到隊列中,直到lpop操做將其拿走.
(1).爲何在存在queue:reserved集合而且把lpop的任務zadd進支?由於只要lpop了job就能夠將其記錄下來,若此時任務還未開始執行進程就非正常終止了,該任務就不會丟失,再次執行時,依據上面的步驟就能夠將其取出,防止意外使job丟失.
(2).隊列的執行都是依據json中的類來完成,這部分較簡單,略.
(3).當任務執行成功時,要手動刪除queue:reserved中的任務;當任務執行失敗,刪除queue:reserved中的任務,再將其記錄下來,記錄方式是zadd queue:delayed
, 而且將該任務的執行次數加一,這個過程RedisQueue已經封裝(RedisQueue::release)好了.
這一系列的過程就完成了讓隊列任務延遲的功能.因此這麼複雜的操做都是爲了實現延遲的功能,固然,有更好的點子能夠考慮本身實現.
到此,任務的執行在json數據結構中表現的很明確,整個處理過程也很清晰了.須要注意的是當任務執行成功後要刪除任務.對於如何執行出隊列,以及如何執行隊列任務,能夠詳細看看queue:work
命令(\Illuminate\Queue\Console\WorkCommand::fire
), 它是最好的示例;
在使用queue:work
以後,會發現它並不有處理全部的狀況.因此在本文中一直提到過,自寫一個處理命令是可行的.當面臨queue:work
所不能解決的問題時,能夠好好考慮下本身編寫.在實際開發中,任務的種類繁多,對於不一樣的任務應該有不一樣的處理方案.因此,有如下幾個問題是常常遇到的:
好比:
因此,面臨laravel所提供命令的侷限性,有自定義處理命令的能力是頗有必要的.
之於框架(優秀開源產品),只不過是有着做者我的風格的一些封裝,要真正的學會使用它,則須要把這種風格化的表象移除, 看到這層’皮’下究竟是什麼,這樣才能學習到框架的本質.但願這篇文章能給同窗們帶來一點幫助.
laravel學院的中文文檔給力