rabbitmq延遲隊列的實現(利用死信隊列)

普通的延遲隊列不細說了,不管是設置統一的隊列TTL,仍是設置消息的TTL,全都是利用DeadLetterQueue:消息失效後扔到死信隊列,消費者從死信隊列裏讀消息。但在消息失效的過程當中存在一個問題,好比以下場景:php

延遲隊列中依次收到以下消息 Message A: TTL 2000 Message B: TTL 100 Message C: TTL 5000 當延遲隊列中消息超時後,移至死信隊列

實際執行狀況是rabbitMQ從隊列頭取到消息A,等待2秒,超時,發至死信隊列 -> 判斷消息B,發現已經超時,當即發至死信隊列。緣由是TTL被存在消息的內部,rabbitMQ一直去掃描每條消息的TTL,而是隻判斷隊列頭消息是否失效,因而消息B實際失效時間是2000ms。git

目前rabbitMQ是不支持任意超時時間的(聽說rocketMQ提供有限支持,沒用過),但能夠經過安裝一個插件來解決:github

https://github.com/rabbitmq/rabbitmq-delayed-message-exchangejson

安裝過程很簡單,下載下來,在rabbitMQ的sbin目錄下執行安裝bash

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件使用Exchange交換機來實現的TTL,而不是rabbitMQ那樣經過死信隊列:首先設置一個可延遲的Exchange,該Exchange會將收到的消息存放,按照延遲時間排序,直到達到延遲,才被轉發到實際的執行隊列。app

啓用插件後,進入rabbitMQ的管理頁面進行配置:post

新建一個Exchange,Type必須選擇x-delayed-message,添加參數x-delayed-type=direct,而後綁定到執行隊列測試

$msg = new AMQPMessage(json_encode([ 'event' => $eventName, 'params' => $params ]), [ 'delivery_mode' => 2, ]); $headers = new AMQPTable(['x-delay' => $delay]); $msg->set('application_headers', $headers); $this->channel->basic_publish($msg, self::EXCHANGE_DELAY_ANY_NAME, self::EXCHANGE_KEY_NOW);

接下來,爲投遞到該Exchange的消息添加插件所需的header:x-delay,值就是要延遲的時間,單位毫秒,若是不添加這個頭,全部流入該交換機的消息都會當即被轉發到執行隊列。注意若是使用php-amqplib的話,須要用AMQPTable設置這個headerthis

接下來按照剛纔的三條消息測試一下,發現B消息成功比A消息進入隊列了。搞定spa

 

轉載:http://www.xuyanzhe.cn/?p=115

具體實現參考連接: http://www.javashuo.com/article/p-fjwpmtdg-dw.html

高級特性:https://zhuanlan.zhihu.com/p/60141062

相關文章
相關標籤/搜索