相信不少小夥伴都在開發中使用過消息隊列,尤爲是高併發的狀況,通常能夠在緩存中操做數據,而後經過消息異步處理業務邏輯,操做數據庫等。php
本人所在的公司使用了阿里雲的消息隊列和RabbitMQ,聽說使用阿里雲消息隊列的一部分緣由是RabbitMQ實現延遲消息比較複雜,要依賴死信...接下來進入主題,說說我是怎麼使用消息和遇到的坑吧~redis
通常來說,咱們可使用一個腳原本接收阿里雲消息處理業務邏輯,可是若是業務量特別大的話,咱們可能會遇到一個問題,就是腳本處理不過來,消息積攢的數量可能遠遠超出每秒可以處理的數量。針對這種現象,咱們能夠啓動多個處理腳原本同時處理消息,這樣會明顯加快處理消息的速度。mongodb
可是,多個腳本同時從一條消息隊列裏面取消息的時候,會不會同時取到同一條消息,而後形成消息重複處理的現象呢?我以爲確定是會的,消息是第三方服務,咱們沒法保證他的100%穩定,因此咱們須要在處理的時候下點功夫了。數據庫
咱們發送消息的數據體是json,通常咱們會在每條消息裏面加一個taskid,以時間戳(精確到毫秒級) + 隨機數組成,這個taskid足夠長,咱們得以保證他不會重複(重複的可能性極小,相似於mongodb的主鍵也是這個原理)。json
接下來看一段代碼:數組
<?php緩存
try { //僞代碼 $getData = $mq->receive(); $getData = \Zend\Json\Json::decode($getData, true); //若是不是json數據 咱們能夠捕獲到異常 //先檢測數據 $errorMsg = []; if (!isset($getData['taskid']) || $getData['taskid'] == '') { $errorMsg[] = 'taskid不能爲空'; } if (!isset($getData['order_id']) || $getData['order_id'] == '') { $errorMsg[] = '訂單號不能爲空'; } if (!cache()->setex($this->cachePrefix . $getData['taskid'], 1)) { $errorMsg[] = '該任務已被處理'; } if (count($errorMsg) >= 1) { //記日誌 方便排錯 $this->log('xxxx處理腳本錯誤 哪一個文件 錯誤級別 錯誤緣由' . implode('|', $errorMsg)); return false; } //必須有過時時間 否則會把redis撐爆 cache()->expire($this->cachePrefix . $getData['taskid'], 7 * 86400); /** * 處理業務邏輯 */ //業務邏輯處理正常,刪除redis鎖,刪除消息 cache()->del($this->cachePrefix . $getData['taskid']); $mq->ack(); return true; } //捕獲到了異常 catch (\Exception $e) { //必定要把此次消息刪掉 否則會重複進來 日誌錯誤級別記高一點 手動處理問題 $mq->ack(); //記日誌 $this->log(); }
小夥伴們知道這段代碼哪裏有問題嗎?併發