Asp.NetCore輕鬆學-實現一個輕量級高可複用的RabbitMQ客戶端

前言

本示例經過對服務訂閱的封裝、隱藏細節實現、統一配置、自動重連、異常處理等各個方面來打造一個簡單易用的 RabbitMQ 工廠;本文適合適合有必定 RabbitMQ 使用經驗的讀者閱讀,若是你尚未實際使用過 RabbitMQ,也沒有關係,由於本文的代碼都是基於直接運行的實例,經過簡單的修改 RabbitMQ 便可運行。git

  • 解決方案以下

1. 建立基礎鏈接管理幫助類

首先,建立一個 .netcore 控制檯項目,建立 Helper、Service、Utils 文件夾,分別用於存放通道管理、服務訂閱、公共組件。github

1.1 接下來建立一個 MQConfig 類,用於存放 RabbitMQ 主機配置等信息
public class MQConfig
    {
        /// <summary>
        ///  訪問消息隊列的用戶名
        /// </summary>
        public string UserName { get; set; }
        /// <summary>
        ///  訪問消息隊列的密碼
        /// </summary>
        public string Password { get; set; }
        /// <summary>
        ///  消息隊列的主機地址
        /// </summary>
        public string HostName { get; set; }
        /// <summary>
        ///  消息隊列的主機開放的端口
        /// </summary>
        public int Port { get; set; }
    }
1.2 建立 RabbitMQ 鏈接管理類,用於建立鏈接,關閉鏈接
1.3 建立一個消息體對象 MessageBody,用於解析和傳遞消息到業務系統中,在接下來的 MQChannel 類中會用到
public class MessageBody
    {
        public EventingBasicConsumer Consumer { get; set; }
        public BasicDeliverEventArgs BasicDeliver { get; set; }
        /// <summary>
        ///  0成功
        /// </summary>
        public int Code { get; set; }
        public string Content { get; set; }
        public string ErrorMessage { get; set; }
        public bool Error { get; set; }
        public Exception Exception { get; set; }
    }
1.4 建立一個通道類,用於訂閱、發佈消息,同時提供一個關閉通道鏈接的方法 Stop
public class MQChannel
    {
        public string ExchangeTypeName { get; set; }
        public string ExchangeName { get; set; }
        public string QueueName { get; set; }
        public string RoutekeyName { get; set; }
        public IConnection Connection { get; set; }
        public EventingBasicConsumer Consumer { get; set; }

        /// <summary>
        ///  外部訂閱消費者通知委託
        /// </summary>
        public Action<MessageBody> OnReceivedCallback { get; set; }

        public MQChannel(string exchangeType, string exchange, string queue, string routekey)
        {
            this.ExchangeTypeName = exchangeType;
            this.ExchangeName = exchange;
            this.QueueName = queue;
            this.RoutekeyName = routekey;
        }

        /// <summary>
        ///  向當前隊列發送消息
        /// </summary>
        /// <param name="content"></param>
        public void Publish(string content)
        {
            byte[] body = MQConnection.UTF8.GetBytes(content);
            IBasicProperties prop = new BasicProperties();
            prop.DeliveryMode = 1;
            Consumer.Model.BasicPublish(this.ExchangeName, this.RoutekeyName, false, prop, body);
        }

        internal void Receive(object sender, BasicDeliverEventArgs e)
        {
            MessageBody body = new MessageBody();
            try
            {
                string content = MQConnection.UTF8.GetString(e.Body);
                body.Content = content;
                body.Consumer = (EventingBasicConsumer)sender;
                body.BasicDeliver = e;
            }
            catch (Exception ex)
            {
                body.ErrorMessage = $"訂閱-出錯{ex.Message}";
                body.Exception = ex;
                body.Error = true;
                body.Code = 500;
            }
            OnReceivedCallback?.Invoke(body);
        }

        /// <summary>
        ///  設置消息處理完成標誌
        /// </summary>
        /// <param name="consumer"></param>
        /// <param name="deliveryTag"></param>
        /// <param name="multiple"></param>
        public void SetBasicAck(EventingBasicConsumer consumer, ulong deliveryTag, bool multiple)
        {
            consumer.Model.BasicAck(deliveryTag, multiple);
        }

        /// <summary>
        ///  關閉消息隊列的鏈接
        /// </summary>
        public void Stop()
        {
            if (this.Connection != null && this.Connection.IsOpen)
            {
                this.Connection.Close();
                this.Connection.Dispose();
            }
        }
    }
1.5 在上面的 MQChannel 類中

首先是在構造函數內對當前通道的屬性進行設置,其次提供了 Publish 和 OnReceivedCallback 的委託,當通道接收到消息的時候,會進入方法 Receive 中,在 Receive 中,通過封裝成 MessageBody 對象,並調用委託 OnReceivedCallback ,將,解析好的消息傳遞到外邊訂閱者的業務中。最終在 MQChannel 中還提供了消息確認的操做方法 SetBasicAck,供業務系統手動調用。web

1.6 接着再建立一個 RabbitMQ 通道管理類,用於建立通道,代碼很是簡單,只有一個公共方法 CreateReceiveChannel,傳入相關參數,建立一個 MQChannel 對象
public class MQChannelManager
    {
        public MQConnection MQConn { get; set; }

        public MQChannelManager(MQConnection conn)
        {
            this.MQConn = conn;
        }

        /// <summary>
        ///  建立消息通道
        /// </summary>
        /// <param name="cfg"></param>
        public MQChannel CreateReceiveChannel(string exchangeType, string exchange, string queue, string routekey)
        {
            IModel model = this.CreateModel(exchangeType, exchange, queue, routekey);
            model.BasicQos(0, 1, false);
            EventingBasicConsumer consumer = this.CreateConsumer(model, queue);
            MQChannel channel = new MQChannel(exchangeType, exchange, queue, routekey)
            {
                Connection = this.MQConn.Connection,
                Consumer = consumer
            };
            consumer.Received += channel.Receive;
            return channel;
        }

        /// <summary>
        ///  建立一個通道,包含交換機/隊列/路由,並創建綁定關係
        /// </summary>
        /// <param name="type">交換機類型</param>
        /// <param name="exchange">交換機名稱</param>
        /// <param name="queue">隊列名稱</param>
        /// <param name="routeKey">路由名稱</param>
        /// <returns></returns>
        private IModel CreateModel(string type, string exchange, string queue, string routeKey, IDictionary<string, object> arguments = null)
        {
            type = string.IsNullOrEmpty(type) ? "default" : type;
            IModel model = this.MQConn.Connection.CreateModel();
            model.BasicQos(0, 1, false);
            model.QueueDeclare(queue, true, false, false, arguments);
            model.QueueBind(queue, exchange, routeKey);
            return model;
        }

        /// <summary>
        ///  接收消息到隊列中
        /// </summary>
        /// <param name="model">消息通道</param>
        /// <param name="queue">隊列名稱</param>
        /// <param name="callback">訂閱消息的回調事件</param>
        /// <returns></returns>
        private EventingBasicConsumer CreateConsumer(IModel model, string queue)
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(model);
            model.BasicConsume(queue, false, consumer);

            return consumer;
        }
    }
1.7 通道管理類的構造方法
public MQChannelManager(MQConnection conn)
        {
            this.MQConn = conn;
        }
1.8 須要傳入一個 MQConnection 對象,僅是一個簡單的鏈接類,代碼以下
public class MQConnection
    {
        private string vhost = string.Empty;
        private IConnection connection = null;
        private MQConfig config = null;

        /// <summary>
        ///  構造無 utf8 標記的編碼轉換器
        /// </summary>
        public static UTF8Encoding UTF8 { get; set; } = new UTF8Encoding(false);

        public MQConnection(MQConfig config, string vhost)
        {
            this.config = config;
            this.vhost = vhost;
        }

        public IConnection Connection
        {
            get
            {
                if (connection == null)
                {
                    ConnectionFactory factory = new ConnectionFactory
                    {
                        AutomaticRecoveryEnabled = true,
                        UserName = this.config.UserName,
                        Password = this.config.Password,
                        HostName = this.config.HostName,
                        VirtualHost = this.vhost,
                        Port = this.config.Port
                    };
                    connection = factory.CreateConnection();
                }

                return connection;
            }
        }
    }
1.9 在上面的代碼中,還初始化了一個靜態對象 UTF8Encoding ,使用無 utf8 標記的編碼轉換器來解析消息

2. 定義和實現服務契約

設想一下,有這樣的一個業務場景,通道管理和服務管理都是相同的操做,若是這些基礎操做都在一個地方定義,且有一個默認的實現,那麼後來者就不須要去關注這些技術細節,直接繼承基礎類後,傳入相應的消息配置便可完成
消息訂閱和發佈操做。網絡

2.1 有了想法,接下來就先定義契約接口 IService,此接口包含建立通道、開啓/中止訂閱,一個服務可能承載多個通道,因此還須要包含通道列表
public interface IService
    {
        /// <summary>
        ///  建立通道
        /// </summary>
        /// <param name="queue">隊列名稱</param>
        /// <param name="routeKey">路由名稱</param>
        /// <param name="exchangeType">交換機類型</param>
        /// <returns></returns>
        MQChannel CreateChannel(string queue, string routeKey, string exchangeType);

        /// <summary>
        ///  開啓訂閱
        /// </summary>
        void Start();

        /// <summary>
        ///  中止訂閱
        /// </summary>
        void Stop();

        /// <summary>
        ///  通道列表
        /// </summary>
        List<MQChannel> Channels { get; set; }

        /// <summary>
        ///  消息隊列中定義的虛擬機
        /// </summary>
        string vHost { get; }

        /// <summary>
        ///  消息隊列中定義的交換機
        /// </summary>
        string Exchange { get; }
    }
2.2 接下來建立一個抽象類來實現該接口,將實現細節進行封裝,方便後面的業務服務繼承調用
public abstract class MQServiceBase : IService
    {
        internal bool started = false;
        internal MQServiceBase(MQConfig config)
        {
            this.Config = config;
        }

        public MQChannel CreateChannel(string queue, string routeKey, string exchangeType)
        {
            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager cm = new MQChannelManager(conn);
            MQChannel channel = cm.CreateReceiveChannel(exchangeType, this.Exchange, queue, routeKey);
            return channel;
        }

        /// <summary>
        ///  啓動訂閱
        /// </summary>
        public void Start()
        {
            if (started)
            {
                return;
            }

            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager manager = new MQChannelManager(conn);
            foreach (var item in this.Queues)
            {
                MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
                channel.OnReceivedCallback = item.OnReceived;
                this.Channels.Add(channel);
            }
            started = true;
        }

        /// <summary>
        ///  中止訂閱
        /// </summary>
        public void Stop()
        {
            foreach (var c in this.Channels)
            {
                c.Stop();
            }
            this.Channels.Clear();
            started = false;
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="message"></param>
        public abstract void OnReceived(MessageBody message);

        public List<MQChannel> Channels { get; set; } = new List<MQChannel>();

        /// <summary>
        ///  消息隊列配置
        /// </summary>
        public MQConfig Config { get; set; }

        /// <summary>
        ///  消息隊列中定義的虛擬機
        /// </summary>
        public abstract string vHost { get; }

        /// <summary>
        ///  消息隊列中定義的交換機
        /// </summary>
        public abstract string Exchange { get; }

        /// <summary>
        ///  定義的隊列列表
        /// </summary>
        public List<QueueInfo> Queues { get; } = new List<QueueInfo>();
    }

上面的抽象類,原封不動的實現接口契約,代碼很是簡單,在 Start 方法中,建立通道和啓動消息訂閱;同時,將通道加入屬性 Channels 中,方便後面的自檢服務使用;在 Start 方法中ide

/// <summary>
        ///  啓動訂閱
        /// </summary>
        public void Start()
        {
            if (started)
            {
                return;
            }

            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager manager = new MQChannelManager(conn);
            foreach (var item in this.Queues)
            {
                MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
                channel.OnReceivedCallback = item.OnReceived;
                this.Channels.Add(channel);
            }
            started = true;
        }

使用 MQChannelManager 建立了一個通道,並將通道的回調委託 OnReceivedCallback 設置爲 item.OnReceived 方法,該方法將有子類實現;在將當前訂閱服務通道建立完成後,標記服務狀態 started 爲 true,防止重複啓動;同時,在該抽象類中,不實現契約的 OnReceived(MessageBody message);強制基礎業務服務類去自我實現,由於各類業務的特殊性,這塊對消息的處理不能再基礎服務中完成函數

接下來要介紹的是服務監控管理類,該類內部定義一個簡單的定時器功能,不間斷的對 RabbitMQ 的通信進行偵聽,一旦發現有斷開的鏈接,就自動建立一個新的通道,並移除舊的通道;同時,提供 Start/Stop 兩個方法,以供程序 啓動/中止 的時候對測試

2.3 RabbitMQ 的鏈接和通道進行清理;代碼以下
public class MQServcieManager
    {
        public int Timer_tick { get; set; } = 10 * 1000;
        private Timer timer = null;

        public Action<MessageLevel, string, Exception> OnAction = null;
        public MQServcieManager()
        {
            timer = new Timer(OnInterval, "", Timer_tick, Timer_tick);
        }


        /// <summary>
        ///  自檢,配合 RabbitMQ 內部自動重連機制
        /// </summary>
        /// <param name="sender"></param>
        private void OnInterval(object sender)
        {
            int error = 0, reconnect = 0;
            OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 正在執行自檢", null);
            foreach (var item in this.Services)
            {
                for (int i = 0; i < item.Channels.Count; i++)
                {
                    var c = item.Channels[i];
                    if (c.Connection == null || !c.Connection.IsOpen)
                    {
                        error++;
                        OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 從新建立訂閱", null);
                        try
                        {
                            c.Stop();
                            var channel = item.CreateChannel(c.QueueName, c.RoutekeyName, c.ExchangeTypeName);
                            item.Channels.Remove(c);
                            item.Channels.Add(channel);

                            OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 從新建立完成", null);
                            reconnect++;
                        }
                        catch (Exception ex)
                        {
                            OnAction?.Invoke(MessageLevel.Information, ex.Message, ex);
                        }
                    }
                }
            }
            OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 自檢完成,錯誤數:{error},重連成功數:{reconnect}", null);
        }

        public void Start()
        {
            foreach (var item in this.Services)
            {
                try
                {
                    item.Start();
                }
                catch (Exception e)
                {
                    OnAction?.Invoke(MessageLevel.Error, $"啓動服務出錯 | {e.Message}", e);
                }
            }
        }

        public void Stop()
        {
            try
            {
                foreach (var item in this.Services)
                {
                    item.Stop();
                }
                Services.Clear();
                timer.Dispose();
            }
            catch (Exception e)
            {
                OnAction?.Invoke(MessageLevel.Error, $"中止服務出錯 | {e.Message}", e);
            }
        }

        public void AddService(IService service)
        {
            Services.Add(service);
        }
        public List<IService> Services { get; set; } = new List<IService>();
    }

代碼比較簡單,就不在一一介紹,爲了將異常等內部信息傳遞到外邊,方便使用第三方組件進行日誌記錄等需求,MQServcieManager 還使用了 MessageLevel 這個定義,方便業務根據不一樣的消息級別對消息進行處理this

public enum MessageLevel
    {
        Trace = 0,
        Debug = 1,
        Information = 2,
        Warning = 3,
        Error = 4,
        Critical = 5,
        None = 6
    }

3. 開始使用

終於來到了這一步,咱們將要開始使用這個基礎服務;首先,建立一個 DemoService 繼承自 MQServiceBase ;同時,編碼

3.1 實現 MQServiceBase 的抽象方法 OnReceived(MessageBody message)
public class DemoService : MQServiceBase
    {
        public Action<MessageLevel, string, Exception> OnAction = null;
        public DemoService(MQConfig config) : base(config)
        {
            base.Queues.Add(new QueueInfo()
            {
                ExchangeType = ExchangeType.Direct,
                Queue = "login-message",
                RouterKey = "pk",
                OnReceived = this.OnReceived
            });
        }

        public override string vHost { get { return "gpush"; } }
        public override string Exchange { get { return "user"; } }


        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="message"></param>
        public override void OnReceived(MessageBody message)
        {
            try
            {
                Console.WriteLine(message.Content);
            }
            catch (Exception ex)
            {
                OnAction?.Invoke(MessageLevel.Error, ex.Message, ex);
            }
            message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, true);

        }
    }

以上的代碼很是簡單,幾乎不須要業務開發者作更多的其它工做,開發者只須要在構造方法內部傳入一個 QueueInfo 對象,若是有多個,可一併傳入.net

public partial class QueueInfo
    {
        /// <summary>
        ///  隊列名稱
        /// </summary>
        public string Queue { get; set; }
        /// <summary>
        ///  路由名稱
        /// </summary>
        public string RouterKey { get; set; }
        /// <summary>
        ///  交換機類型
        /// </summary>
        public string ExchangeType { get; set; }
        /// <summary>
        ///  接受消息委託
        /// </summary>
        public Action<MessageBody> OnReceived { get; set; }
        /// <summary>
        ///  輸出信息到客戶端
        /// </summary>
        public Action<MQChannel, MessageLevel, string> OnAction { get; set; }
    }

並設置 vHost 和 Exchange 的值,而後剩下的就是在 OnReceived(MessageBody message) 方法中專心的處理本身的業務了;在這裏,咱們僅輸出接收到的消息,並設置 ack 爲已成功處理。

4. 測試代碼

4.1 在 Program,咱們執行該測試
class Program
    {
        static void Main(string[] args)
        {
            Test();
        }

        static void Test()
        {
            MQConfig config = new MQConfig()
            {
                HostName = "127.0.0.1",
                Password = "123456",
                Port = 5672,
                UserName = "dotnet"
            };

            MQServcieManager manager = new MQServcieManager();
            manager.AddService(new DemoService(config));
            manager.OnAction = OnActionOutput;
            manager.Start();

            Console.WriteLine("服務已啓動");
            Console.ReadKey();

            manager.Stop();
            Console.WriteLine("服務已中止,按任意鍵退出...");
            Console.ReadKey();
        }

        static void OnActionOutput(MessageLevel level, string message, Exception ex)
        {
            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine("{0} | {1} | {2}", level, message, ex?.StackTrace);
            Console.ForegroundColor = ConsoleColor.Gray;
        }
    }
4.2 利用 MQServcieManager 對象,完成了對全部消息訂閱者的管理和監控,
4.3 首先咱們到 RabbitMQ 的 web 控制檯發佈一條消息到隊列 login-message 中

4.3 而後查看輸出結果

消息已經接收並處理,爲了查看監控效果,我還手動將網絡進行中斷,而後監控服務檢測到沒法鏈接,嘗試重建通道,並將消息輸出

  • 圖中步驟說明
  • 0:服務啓動
  • 1:自檢啓動
  • 2:服務報錯,嘗試重建,重建失敗,繼續監測
  • 3:RabbitMQ 內部監控自動重連,監控程序檢測到已恢復,收到消息並處理
  • 4:後續監控服務繼續進行監控

結語

在文章中,咱們創建了 RabbitMQ 的通道管理、基礎服務管理、契約實現等操做,讓業務開發人員經過簡單的繼承實現去快速的處理業務系統的邏輯,後續若是有增長消費者的狀況下,只須要經過 MQServcieManager.AddService 進行簡單的調用操做便可,無需對底層技術細節進行過多的改動。

源碼下載:
https://github.com/lianggx/EasyAspNetCoreDemo/tree/master/Ron.MQTest

相關文章
相關標籤/搜索