分析以前請你們務必瞭解消息隊列的實現
若是不瞭解請先閱讀下:
有贊消息隊列設計
去哪兒網消息隊列設計php
tp5的消息隊列是基於database redis 和tp官方本身實現的 Topthink
本章是圍繞redis來作分析git
key | 類型 | 描述 |
---|---|---|
queues:queueName |
list | 要執行的任務 |
think:queue:restart |
string | 重啓隊列時間戳 |
queues:queueName:delayed |
zSet | 延遲任務 |
queues:queueName:reserved |
zSet | 執行失敗,等待從新執行 |
work和listen的區別在下面會解釋
命令 | 描述 |
---|---|
php think queue:work |
監聽隊列 |
php think queue:listen |
監聽隊列 |
php think queue:restart |
重啓隊列 |
php think queue:subscribe |
暫無,多是保留的 官方有什麼其餘想法可是還沒實現 |
標籤 | 描述 |
---|---|
worker_daemon_start |
守護進程開啓 |
worker_memory_exceeded |
內存超出 |
worker_queue_restart |
重啓守護進程 |
worker_before_process |
任務開始執行以前 |
worker_before_sleep |
任務延遲執行 |
queue_failed |
任務執行失敗 |
參數 | 默認值 | 可使用的模式 | 描述 |
---|---|---|---|
queue |
null | work,listen | 要執行的任務名稱 |
daemon |
null | work | 以守護進程執行任務 |
delay |
0 | work,listen | 失敗後從新執行的時間 |
force |
null | work | 失敗後從新執行的時間 |
memory |
128M | work,listen | 限制最大內存 |
sleep |
3 | work,listen | 沒有任務的時候等待的時間 |
tries |
0 | work,listen | 任務失敗後最大嘗試次數 |
1: 執行原理不一樣
work: 單進程的處理模式;
無 daemon 參數 work進程在處理完下一個消息後直接結束當前進程。當不存在新消息時,會sleep一段時間而後退出;
有 daemon 參數 work進程會循環地處理隊列中的消息,直到內存超出參數配置才結束進程。當不存在新消息時,會在每次循環中sleep一段時間;github
listen: 父進程 + 子進程 的處理模式;
會在所在的父進程會建立一個單次執行模式的work子進程,並經過該work子進程來處理隊列中的下一個消息,當這個work子進程退出以後;
所在的父進程會監聽到該子進程的退出信號,並從新建立一個新的單次執行的work子進程;redis
2: 退出時機不一樣
work: 看上面
listen: 所在的父進程正常狀況會一直運行,除非遇到下面兩種狀況
01: 建立的某個work子進程的執行時間超過了 listen命令行中的--timeout 參數配置;此時work子進程會被強制結束,listen所在的父進程也會拋出一個 ProcessTimeoutException 異常並退出;json
開發者能夠選擇捕獲該異常,讓父進程繼續執行;
02: 所在的父進程因某種緣由存在內存泄露,則當父進程自己佔用的內存超過了命令行中的 --memory 參數配置時,父子進程均會退出。正常狀況下,listen進程自己佔用的內存是穩定不變的。數組
3: 性能不一樣
work: 是在腳本內部作循環,框架腳本在命令執行的初期就已加載完畢;app
listen: 是處理完一個任務以後新開一個work進程,此時會從新加載框架腳本;框架
所以 work 模式的性能會比listen模式高。
注意: 當代碼有更新時,work 模式下須要手動去執行 php think queue:restart 命令重啓隊列來使改動生效;而listen 模式會自動生效,無需其餘操做。dom
4: 超時控制能力
work: 本質上既不能控制進程自身的運行時間,也沒法限制執行中的任務的執行時間;
listen: 能夠限制其建立的work子進程的超時時間;tcp
可經過 timeout 參數限制work子進程容許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束;
expire 和time的區別
expire 在配置文件中設置,指任務的過時時間 這個時間是全局的,影響到全部的work進程
timeout 在命令行參數中設置,指work子進程的超時時間,這個時間只對當前執行的listen 命令有效,timeout 針對的對象是 work 子進程;
5: 使用場景不一樣
work 適用場景是:
01: 任務數量較多
02: 性能要求較高
03: 任務的執行時間較短
04: 消費者類中不存在死循環,sleep() ,exit() ,die() 等容易致使bug的邏輯
listen 適用場景是:
01: 任務數量較少
02: 任務的執行時間較長
03: 任務的執行時間須要有嚴格限制
因爲咱們是根據redis來作分析 因此只須要分析src/queue/connector/redis.php
01: 首先調用src/Queue.php
中的魔術方法__callStatic
02: 在__callStatic方法中調用了buildConnector
03: buildConnector 中首先加載配置文件 若是無將是同步執行
04: 根據配置文件去建立鏈接而且傳入配置
在redis.php類的構造方法中的操做:
01: 檢測redis擴展是否安裝
02: 合併配置
03: 檢測是redis擴展仍是 pRedis
04: 建立鏈接對象
參數名 | 默認值 | 描述 | 可使用的方法 |
---|---|---|---|
$job | 無 | 要執行任務的類 | push,later |
$data | 空 | 任務數據 | push,later |
$queue | default | 任務名稱 | push,later |
$delay | null | 延遲時間 | later |
push($job, $data, $queue) Queue::push(Test::class, ['id' => 1], 'test');
一頓騷操做後返回一個數組 而且序列化後 rPush到redis中 key爲 queue:queueName
數組結構:
[ 'job' => $job, // 要執行任務的類 'data' => $data, // 任務數據 'id'=>'xxxxx' //任務id ]
寫入 redis而且返回隊列id
至於中間的那頓騷操做太長了就沒寫
later($delay, $job, $data, $queue) Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差很少
一頓騷操做後返回一個數組 而且序列化後 zAdd 到redis中 key爲 queue:queueName:delayed
score爲當前的時間戳+$delay
執行過程有work模式和listen模式 兩種 區別上面已經說了 代碼邏輯因爲太多等下回分解;
最後講一下標籤的使用
//守護進程開啓 'worker_daemon_start' => [ \app\index\behavior\WorkerDaemonStart::class ], //內存超出 'worker_memory_exceeded' => [ \app\index\behavior\WorkerMemoryExceeded::class ], //重啓守護進程 'worker_queue_restart' => [ \app\index\behavior\WorkerQueueRestart::class ], //任務開始執行以前 'worker_before_process' => [ \app\index\behavior\WorkerBeforeProcess::class ], //任務延遲執行 'worker_before_sleep' => [ \app\index\behavior\WorkerBeforeSleep::class ], //任務執行失敗 'queue_failed' => [ \app\index\behavior\QueueFailed::class ]
public function run(Output $output) { $output->write('<info>任務執行失敗</info>', true); }
控制檯執行 php think queue:work --queue test --daemon
會在控制檯一次輸出
守護進程開啓 任務延遲執行
失敗的處理 若是有任務執行失敗或者執行次數達到最大值
會觸發 queue_failed
在app\index\behavior@run
方法裏面寫失敗的邏輯 好比郵件通知 寫入日誌等
最後咱們來講一下如何在其餘框架或者項目中給tp的項目推送消息隊列,例如兩個項目是分開的 另外一個使用的卻不是tp5的框架
<?php class Index { private $redis = null; public function __construct() { $this->redis = new Redis(); $this->redis->connect('127.0.0.1', 6379); $this->redis->select(10); } public function push($job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->rPush('queues:' . $queue, $payload); } public function later($delay, $job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload); } private function createPayload($job, $data) { $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32)); return $this->setMeta($payload, 'attempts', 1); } private function setMeta($payload, $key, $value) { $payload = json_decode($payload, true); $payload[$key] = $value; $payload = json_encode($payload); if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); } return $payload; } private function random(int $length = 16): string { $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; $randomString = ''; for ($i = 0; $i < $length; $i++) { $randomString .= $str[rand(0, strlen($str) - 1)]; } return $randomString; } } (new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
package main import ( "encoding/json" "github.com/garyburd/redigo/redis" "math/rand" "time" ) type Payload struct { Id string `json:"id"` Job string `json:"job"` Data interface{} `json:"data"` Attempts int `json:"attempts"` } var RedisClient *redis.Pool func init() { RedisClient = &redis.Pool{ MaxIdle: 20, MaxActive: 500, IdleTimeout: time.Second * 100, Dial: func() (conn redis.Conn, e error) { c, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { return nil, err } _, _ = c.Do("SELECT", 10) return c, nil }, } } func main() { var data = make(map[string]interface{}) data["id"] = "1" later(10, "app\\index\\jobs\\Test", data, "test") } func push(job string, data interface{}, queue string) { payload := createPayload(job, data) queueName := "queues:" + queue _, _ = RedisClient.Get().Do("rPush", queueName, payload) } func later(delay int, job string, data interface{}, queue string) { m, _ := time.ParseDuration("+1s") currentTime := time.Now() op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix() createPayload(job, data) payload := createPayload(job, data) queueName := "queues:" + queue + ":delayed" _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload) } // 建立指定格式的數據 func createPayload(job string, data interface{}) (payload string) { payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1} jsonStr, _ := json.Marshal(payload1) return string(jsonStr) } // 建立隨機字符串 func random(n int) string { var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") b := make([]rune, n) for i := range b { b[i] = str[rand.Intn(len(str))] } return string(b) }