爲何使用延遲消息?java
不一樣於同步消息,有些業務場景下但願能夠實現延遲必定時間再消費消息。git
典型的場景有微信、支付寶等第三方支付回調接口,會在用戶支付後3秒、5秒、30秒等等時間後向應用服務器發送回調請求,確保應用服務器能夠正確收到消息。數據庫
那有些朋友就會說了,把須要定時處理的數據存到數據庫中用定時任務就能夠實現,爲何還弄個異步消息。增長後臺維護成本。服務器
使用定時任務固然沒有問題能夠實現該問題。在小數據量狀況下沒有問題。但當數據量交大的時候怎麼辦?若是每一個任務的延遲時間不一樣怎麼辦?微信
其餘方式實現消息隊列數據結構
名稱 | 實現方式 | 詳細說明 |
---|---|---|
Redis | 使用zset數據結構 | 使用zset的score屬性存放執行時間戳,起一個死循環的線程不斷的取第一個Key值,若是當前時間戳大於該Key的socre 值時將它取出來消費,注意不須要遍歷整個Zset集合,以避免形成性能浪費 |
定時任務 | 給定週期掃描待處理消息 | 使用該方式間隔時間很差控制,給短會形成無心義的掃描,增長數據庫壓力,給長了偏差較大 |
定時任務 | 動態建立惟一性定時任務 | 一次性的任務會增長數據庫存儲,須要定時清理,如相差時間較近的任務較多,也會形成性能較差 |
時間輪 | 自定義 | 自定義一個時間輪的數據結構,啓動一個後臺線程,延遲一秒,獲取時間輪中的任務啓動子線程獨立執行時間輪的任務 |
如何選擇消息中間件?app
中間件 | 是否原生支持 | 說明 |
---|---|---|
RocketMQ | 支持 | 不支持任意時間的延遲消息的設置,僅支持內置預設值的延遲時間間隔的延遲消息。預設值的延遲時間間隔爲:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h |
RabbitMQ | 不支持 | 可以使用消息的TTL和死信Exchange實現 |
Kafka | 不支持 | 可以使用TimingWheel 實現 |
AcitveMQ | 支持 |
因本身在使用RabbitMQ作爲消息中間件,因此直接選用了RabbitMQ來實現。dom
實現以前異步
在實現以前咱們先須要知道RabbitMQ如下兩個概念。性能
消息若是在隊列中一直沒有被消費而且存在時間超過了TTL,消息就會變成了"死信" (Dead Message),後續沒法再被消費了。
它的做用實際上是用來接收死信消息(dead message)的。
由於消息若是未被正常消費並設置了requeue爲false時會進入死信隊列,咱們能夠監控消費死信隊列中消息,來觀察和分析系統的問題。
RabbitMQ能夠從兩種維度設置消息過時時間,分別是隊列和消息自己。兩種方式哪一個時間小先執行哪一個。
實現思路
想到有兩種實現方式和效果。甚至能夠結合使用。
第一種:設定固定幾個延遲時間(像RocketMQ中間件)
第二種:實現自定義任意時間延遲
以上兩種方式各有優缺點,我本身實現的是第二種,下面詳細說明
圖中後半段死信路由與應用消費基本相同,只要在消費端綁將一個正常隊列與死信路由綁定就行。
/** * @Author: maomao * @Date: 2019-09-04 18:34 */ @Slf4j @Component public class FreeCloudMQConsume { @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "free.cloud.out.mq",durable = "true"), exchange = @Exchange(value = "free.cloud.die.exchange",type = ExchangeTypes.TOPIC), key = "free.cloud.out.mq.dead.message.#")}) public void print(String message){ log.info("print 5 ---- > {}",message); } }
調用方發送消息
/** * 建立延遲隊列,會隨指定延遲時間+5秒後刪除隊列 * @param queueName * @param delayMillis * @return */ private static Queue createDelayQueue(String queueName, Integer delayMillis) { /** * 隊列名稱 //死信時間 ,死信從新投遞的交換機 ,路由到隊列的routingKey */ String time = String.valueOf(System.currentTimeMillis()); String delayQueueName = queueName + ".delay_" + delayMillis + "_" + time; return QueueBuilder.durable(delayQueueName) //設置消息失效時間 .withArgument("x-message-ttl",delayMillis * 1000) //設置隊列自動刪除時間 ,比消息延遲時間多5秒 .withArgument("x-expires", (delayMillis + 5) * 1000) //設置死信路由 .withArgument("x-dead-letter-exchange", "free.cloud.die.exchange") //設置死信路由routingKey .withArgument("x-dead-letter-routing-key", queueName + ".dead.message." + time) .build(); } /** * 發送延遲消息 * @param queueName * @param message * @param delayMillis */ public static void sendDelayMessage(String queueName,Object message,Integer delayMillis){ //死信消息隊列(動態建立,會銷燬) Queue delayQueue = createDelayQueue(queueName, delayMillis); //建立隊列 addQueue(delayQueue); //延遲消息路由Key StringBuilder delayRoutingKey = new StringBuilder(queueName + ".delay"); delayRoutingKey.append(".").append(message.hashCode() + "_" + RandomUtil.randomString(5)); //綁定延遲路由 RabbitMqUtil.addBinding(delayQueue,delayExchange,delayRoutingKey.toString()); getRabbitTemplate().convertAndSend("free.cloud.delay.exchange",delayRoutingKey.toString(),message); }
以上是自定義延遲消息的關鍵實現代碼,完整代碼能夠 點擊這裏 獲取
效果