RabbitMQ死信隊列另類用法之複合死信

前言

在業務開發過程當中,咱們經常須要作一些定時任務,這些任務通常用來作監控或者清理任務,好比在訂單的業務場景中,用戶在建立訂單後一段時間內,沒有完成支付,系統將自動取消該訂單,並將庫存返回到商品中,又好比在微信中,用戶發出紅包24小時後,須要對紅包進行檢查,是否已領取完成,如未領取完成,將剩餘金額退回到發送者錢包中,同時銷燬該紅包。html

在項目初始階段,或者是一些小型的項目中,經常採用定時輪詢的方法進行檢查,可是咱們都知道,定時輪詢將給數據庫帶來不小的壓力,並且定時間隔沒法進行動態調整,特別是一個系統中,同時存在好幾個定時器的時候,就顯得很是的麻煩,同時給數據庫形成巨大的訪問壓力。git

下面,本文將演示如何使用一個 RabbitMQ 的死信隊列同時監控多種業務(複合業務),達到模塊解耦,釋放壓力的目的。程序員

注意:名詞「複合死信」是爲了敘述方便臨時創造的,若有不妥,歡迎指正github

1. 什麼是 RabbitMQ 死信隊列

DLX(Dead Letter Exchanges)死信交換,死信隊列自己也是一個普通的消息隊列,在建立隊列的時候,經過設置一些關鍵參數,能夠將一個普通的消息隊列設置爲死信隊列,與其它消息隊列不一樣的是,其入棧的消息根據入棧時指定的過時時間/被拒絕/超出隊列長度被移除,依次被轉發到指定的消息隊列中進行二次處理。這樣說法比較拗口,其原理就是死信隊列內位於頂部的消息過時時,該消息將被立刻發送到另一個訂閱者(消息隊列)中。數據庫

其原理入下圖json

由上圖能夠看到,目前有三種類型的業務須要使用 DLX 進行處理,由於每一個業務的超時時間不一致的問題,若是將他們都放入一個 DLX 中進行處理,將會出現一個時序的問題,即消息隊列總數處理頂部的消息,若是頂部的消息未過時,而底部的消息過時,這就麻煩了,由於過時的消息沒法獲得消費,將會形成延遲;因此正常狀況下,最好的辦法是每一個業務都獨立一個隊列,這樣就能夠保證,即將過時的消息老是處於隊列的頂部,從而被第一時間處理。瀏覽器

可是多個 DLX 又帶來了管理上面的問題,隨着業務的增長,愈來愈多的業務須要進入不一樣的 DLX ,這個時候咱們發現,因爲人手不足的緣由,維護這麼多 DLX 實在是太吃力了,若是能將這些消息都接入一個 DLX 中該多好呀,在一個 DLX 中進行消息訂閱,而後進行分發或者處理,這就很是有趣了。微信

下面就按照這個思路,咱們進行集中處理,也就是複合死信交換 CDLX(Composite Dead Letter Exchanges)架構

2. 如何建立死信隊列

建立 DLX 隊列的方式很是簡單,咱們使用 RabbitMQ Web 控制面板進行建立 Exhcange(交換機)/Consumer(死信消費隊列)/cdlx(複合死信隊列)app

2.1 建立隊列

建立交換機 cdlx-Exchange

死信消費隊列 cdlx-Consumer

複合死信隊列 cdlx-Master

  • 注意,這裏添加死信隊列必須同時設置死信轉發交換機和路由,後續經過路由綁定實現消費隊列

路由綁定

上面的路由綁定共有兩個,分別是 Master 和 Consumer 用於消息路由到隊列,爲下面的業務消息作準備,建好後的隊列以下

3.複合業務進入死信隊列

當創建好隊列之後,咱們就能夠專心的處理業務了,下面就來模擬3種業務將消息發送到死信隊列的過程

3.1 發送死信消息到隊列

發送消息使用了 Asp.NetCore輕鬆學-實現一個輕量級高可複用的RabbitMQ客戶端 中的輕量客戶端,封裝後的發送消息代碼以下

public class CdlxMasterService
    {
        private IConfiguration cfg = null;
        private ILogger logger = null;
        private string vhost = "test_mq";
        private string exchange = "cdlx-Exchange";
        private string routekey = "master";
        private static MQConnection connection = null;

        private MQConnection Connection
        {
            get
            {
                if (connection == null || !connection.Connection.IsOpen)
                {
                    connection = new MQConnection(
                        cfg["rabbitmq:username"],
                        cfg["rabbitmq:password"],
                        cfg["rabbitmq:host"],
                        Convert.ToInt32(cfg["rabbitmq:port"]),
                        vhost,
                        logger);
                }
                return connection;
            }
        }

        private static IModel channel = null;
        private IModel Channel
        {
            get
            {
                if (channel == null || channel.IsClosed)
                    channel = Connection.Connection.CreateModel();

                return channel;
            }
        }

        public void SendMessage(object data)
        {
            string message = JsonConvert.SerializeObject(data);
            this.Connection.Publish(this.Channel, exchange, routekey, message);
        }
    }
3.2 將 CdlxMasterService 注入到服務
public void ConfigureServices(IServiceCollection services)
        {
           services.AddSingleton<CdlxMasterService>();
           ...
        }
3.3 模擬3種業務生產死信消息
public class HomeController : Controller
    {
        private CdlxMasterService masterService;
        public HomeController(CdlxMasterService masterService)
        {
            this.masterService = masterService;
        }

        [HttpGet("publish")]
        public int Publish()
        {
            Contract contract = new Contract(this.masterService);
            for (int i = 0; i < 10; i++)
            {
                contract.Publish(MessageType.RedPackage, "紅包信息,超時時間1024s");
                contract.Publish(MessageType.Order, "訂單信息,超時時間2048s");
                contract.Publish(MessageType.Vote, "投票信息,超時時間4096s");
            }
            return 0;
        }
    }

上面的接口 puhlish 模擬了業務消息,因爲咱們依次發佈了 紅包/訂單/投票 消息,因此迭代發佈 10 次後,正好造成了一個時序錯亂的信息隊列,按照自動過時時序計算,當第一個紅包超時到達時,第四條消息(紅包)也會接着超時,但是因爲此時訂單和投票消息位於紅包消息上面,該紅包消息在達到超時時間後並不會被投遞到 Consumer 消費隊列,這是正確的,咱們確實也是但願是這個結果

若是有一個辦法把超時的消息自動將其提高到隊列頂部就行了!

4. 處理複合死信

在 RabbitMQ 提供的 API 接口中,沒有什麼直接可用的能將死信隊列中超時消息提高到頂部的好辦法;可是,咱們能夠利用部分 API 接口的特性來完成這件事情。

4.1 定時消費客戶端

下面,咱們將使用一個定時消費客戶端來完成對死信隊列的輪詢,充分利用 RabbitMQ 的消費特性來完成超時消息的位置提高。

過程以下圖:

如上圖所示,咱們增長一個 dlx-timer 定時器,定時的發起對死信隊列的消費,該消費者僅僅是消費,不確認消息,也就是不作 ack,而後將消息從新置入隊列中;這個過程,就是將消息不斷提高位置的過程。

4.2 定時消費客戶端實現代碼
public class CdlxTimerService : MQServiceBase
    {
        public override string vHost { get { return "test_mq"; } }
        public override string Exchange { get { return "cdlx-Exchange"; } }
        public override List<BindInfo> Binds => new List<BindInfo>();
        private string queue = "cdlx-Master";

        public CdlxTimerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
        {
        }

        /// <summary>
        ///  檢查死信隊列
        /// </summary>
        /// <returns></returns>
        public List<CdlxMessage> CheckMessage()
        {
            long total = 0;
            List<CdlxMessage> list = new List<CdlxMessage>();
            var connection = base.CreateConnection();
            using (IModel channel = connection.Connection.CreateModel())
            {
                bool latest = true;
                while (latest)
                {
                    BasicGetResult result = channel.BasicGet(this.queue, false);
                    total++;
                    latest = result != null;
                    if (latest)
                    {
                        var json = Encoding.UTF8.GetString(result.Body);
                        list.Add(JsonConvert.DeserializeObject<CdlxMessage>(json));
                    }
                }
                channel.Close();
                connection.Close();
            }
            return list;
        }
    }

上面的代碼首先在定時調用到來的時候,建立了一個 Connection,而後利用此 Connection 建立了了一個 Channel,緊接着,使用該 Channel 調用 BasicGet 方法,得到隊列頂部的信息,且設置 autoAck=false,表示僅檢查消息,不確認,而後進入一個 while 迭代過程,一直讀取到隊列底部,得到全部隊列中的信息,最後,關閉了通道釋放鏈接。

這樣,就完成了一次消息檢查的過程,在調用 BasicGet 後,下一條信息將會出如今隊列的頂部,同步,隊列將自動對該消息進行超時檢查,因爲咱們在調用 BasicGet 的時候,傳入 autoAck=false,不確認該消息,在 RabbitMQ 控制檯中,將顯示爲 unacted,因此在釋放鏈接後,全部消息將會被從新置入隊列中,這是一個自動的過程,無需咱們作額外的工做。

4.3 Consumer(死信消費隊列)最終處理業務

配置隊列管理隨程序啓動中止

private MQServcieManager serviceManager;
        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime lifeTime)
        {
            serviceManager = new MQServcieManager(this.Configuration, factory.CreateLogger<MQServcieManager>());
            lifeTime.ApplicationStarted.Register(() => { serviceManager.Start(); });
            lifeTime.ApplicationStopping.Register(() => { serviceManager.Stop(); });
            ...
        }

實現消費隊列

public class CdlxConsumerService : MQServiceBase
    {
        public override string vHost { get { return "test_mq"; } }
        public override string Exchange { get { return "cdlx-Exchange"; } }
        private string queue = "cdlx-Consumer";
        private string routeKey = "all";
        private List<BindInfo> bs = new List<BindInfo>();
        public override List<BindInfo> Binds { get { return bs; } }

        public CdlxConsumerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
        {
            this.bs.Add(new BindInfo
            {
                ExchangeType = ExchangeType.Direct,
                Queue = this.queue,
                RouterKey = this.routeKey,
                OnReceived = this.OnReceived
            });
        }

        private void OnReceived(MessageBody body)
        {
            var message = JsonConvert.DeserializeObject<CdlxMessage>(body.Content);
            Console.WriteLine("類型:{0}\t 內容:{1}\t進入時間:{2}\t過時時間:{3}", message.Type, message.Data, message.CreateTime, message.CreateTime.AddSeconds(message.Expire));

            body.Consumer.Model.BasicAck(body.BasicDeliver.DeliveryTag, true);
        }
    }

上面的代碼,模擬了最終業務處理的過程,這裏僅僅是簡單演示,因此只是將消息打印到屏幕上;在實際的業務場景中,咱們能夠根據不一樣的 MessageType 進行消息的分發處理。

5. 消費過程演示

爲了比較直觀的觀看死信消費過程,咱們編寫一個簡單的列表頁面,自動刷新後去消費死信隊列,而後將消息輸出到頁面上,經過觀察此頁面,咱們能夠實時瞭解到死信隊列的消費過程,實際的業務場景中,你們能夠利用第三方定時器定時調用接口實現,或者使用內置的輕量主機作後臺任務實現定時輪詢,具體參考 Asp.Net Core 輕鬆學-基於微服務的後臺任務調度管理器

5.1 發佈消息

瀏覽器訪問本機地址:http://localhost:5000/home/publish

下面將發佈 30 條信息到 DLX 中,每一個業務各 10 條信息。

一般狀況下,紅包的過時時間最短且超時時間一致,應該最快超時,意味着當第一條紅包消息超時的時候,其他 9 條紅包消息也會一併超時,可是因爲紅包消息混合的發佈在隊列中,且只有第一條紅包消息位移隊列頂部;因此,當第一條紅包消息超時被消費後,其他 9 條紅包因爲不是位於隊列頂部,雖然此時他們已經超時,可是 DLX 將沒法處理;當咱們使用 cdlx-timer(定時器)模擬調用 CdlxTimerService 的時候(也就是刷新首頁), CdlxTimerService 服務將會對 DLX 進行檢查。

查看消費狀態

經過上圖的觀察得知,紅色部分首先位於消息頂部被消費,而後就沒法進行超時判斷,接下來,因爲使用了定時輪詢,使得綠色部分消息得以浮動到消息頂部,而後被 DLX 進行處理後消費。

5.2 定時器檢查死信隊列

瀏覽器訪問本機地址:http://localhost:5000/home

上圖的每一次刷新,都是對 DLX 的一次輪詢檢查,隨着輪詢的深刻,全部處於隊列中不一樣位置的超時消息都有機會浮動到隊列頂部進行消費處理。

結束語

業務的發展促進了架構的演進,每個需求升級的背後,是程序員深深的思考;本文從 CDLX 的需求出發,充分利用了 RabbitMQ DLX 對消息檢查的特性,實現了對複合業務的集中處理。

演示代碼下載

https://github.com/lianggx/Examples/tree/master/RabbitMQ.CDLX

相關文章
相關標籤/搜索