本示例經過對服務訂閱的封裝、隱藏細節實現、統一配置、自動重連、異常處理等各個方面來打造一個簡單易用的 RabbitMQ 工廠;本文適合適合有必定 RabbitMQ 使用經驗的讀者閱讀,若是你尚未實際使用過 RabbitMQ,也沒有關係,由於本文的代碼都是基於直接運行的實例,經過簡單的修改 RabbitMQ 便可運行。git
首先,建立一個 .netcore 控制檯項目,建立 Helper、Service、Utils 文件夾,分別用於存放通道管理、服務訂閱、公共組件。github
public class MQConfig { /// <summary> /// 訪問消息隊列的用戶名 /// </summary> public string UserName { get; set; } /// <summary> /// 訪問消息隊列的密碼 /// </summary> public string Password { get; set; } /// <summary> /// 消息隊列的主機地址 /// </summary> public string HostName { get; set; } /// <summary> /// 消息隊列的主機開放的端口 /// </summary> public int Port { get; set; } }
public class MessageBody { public EventingBasicConsumer Consumer { get; set; } public BasicDeliverEventArgs BasicDeliver { get; set; } /// <summary> /// 0成功 /// </summary> public int Code { get; set; } public string Content { get; set; } public string ErrorMessage { get; set; } public bool Error { get; set; } public Exception Exception { get; set; } }
public class MQChannel { public string ExchangeTypeName { get; set; } public string ExchangeName { get; set; } public string QueueName { get; set; } public string RoutekeyName { get; set; } public IConnection Connection { get; set; } public EventingBasicConsumer Consumer { get; set; } /// <summary> /// 外部訂閱消費者通知委託 /// </summary> public Action<MessageBody> OnReceivedCallback { get; set; } public MQChannel(string exchangeType, string exchange, string queue, string routekey) { this.ExchangeTypeName = exchangeType; this.ExchangeName = exchange; this.QueueName = queue; this.RoutekeyName = routekey; } /// <summary> /// 向當前隊列發送消息 /// </summary> /// <param name="content"></param> public void Publish(string content) { byte[] body = MQConnection.UTF8.GetBytes(content); IBasicProperties prop = new BasicProperties(); prop.DeliveryMode = 1; Consumer.Model.BasicPublish(this.ExchangeName, this.RoutekeyName, false, prop, body); } internal void Receive(object sender, BasicDeliverEventArgs e) { MessageBody body = new MessageBody(); try { string content = MQConnection.UTF8.GetString(e.Body); body.Content = content; body.Consumer = (EventingBasicConsumer)sender; body.BasicDeliver = e; } catch (Exception ex) { body.ErrorMessage = $"訂閱-出錯{ex.Message}"; body.Exception = ex; body.Error = true; body.Code = 500; } OnReceivedCallback?.Invoke(body); } /// <summary> /// 設置消息處理完成標誌 /// </summary> /// <param name="consumer"></param> /// <param name="deliveryTag"></param> /// <param name="multiple"></param> public void SetBasicAck(EventingBasicConsumer consumer, ulong deliveryTag, bool multiple) { consumer.Model.BasicAck(deliveryTag, multiple); } /// <summary> /// 關閉消息隊列的鏈接 /// </summary> public void Stop() { if (this.Connection != null && this.Connection.IsOpen) { this.Connection.Close(); this.Connection.Dispose(); } } }
首先是在構造函數內對當前通道的屬性進行設置,其次提供了 Publish 和 OnReceivedCallback 的委託,當通道接收到消息的時候,會進入方法 Receive 中,在 Receive 中,通過封裝成 MessageBody 對象,並調用委託 OnReceivedCallback ,將,解析好的消息傳遞到外邊訂閱者的業務中。最終在 MQChannel 中還提供了消息確認的操做方法 SetBasicAck,供業務系統手動調用。web
public class MQChannelManager { public MQConnection MQConn { get; set; } public MQChannelManager(MQConnection conn) { this.MQConn = conn; } /// <summary> /// 建立消息通道 /// </summary> /// <param name="cfg"></param> public MQChannel CreateReceiveChannel(string exchangeType, string exchange, string queue, string routekey) { IModel model = this.CreateModel(exchangeType, exchange, queue, routekey); model.BasicQos(0, 1, false); EventingBasicConsumer consumer = this.CreateConsumer(model, queue); MQChannel channel = new MQChannel(exchangeType, exchange, queue, routekey) { Connection = this.MQConn.Connection, Consumer = consumer }; consumer.Received += channel.Receive; return channel; } /// <summary> /// 建立一個通道,包含交換機/隊列/路由,並創建綁定關係 /// </summary> /// <param name="type">交換機類型</param> /// <param name="exchange">交換機名稱</param> /// <param name="queue">隊列名稱</param> /// <param name="routeKey">路由名稱</param> /// <returns></returns> private IModel CreateModel(string type, string exchange, string queue, string routeKey, IDictionary<string, object> arguments = null) { type = string.IsNullOrEmpty(type) ? "default" : type; IModel model = this.MQConn.Connection.CreateModel(); model.BasicQos(0, 1, false); model.QueueDeclare(queue, true, false, false, arguments); model.QueueBind(queue, exchange, routeKey); return model; } /// <summary> /// 接收消息到隊列中 /// </summary> /// <param name="model">消息通道</param> /// <param name="queue">隊列名稱</param> /// <param name="callback">訂閱消息的回調事件</param> /// <returns></returns> private EventingBasicConsumer CreateConsumer(IModel model, string queue) { EventingBasicConsumer consumer = new EventingBasicConsumer(model); model.BasicConsume(queue, false, consumer); return consumer; } }
public MQChannelManager(MQConnection conn) { this.MQConn = conn; }
public class MQConnection { private string vhost = string.Empty; private IConnection connection = null; private MQConfig config = null; /// <summary> /// 構造無 utf8 標記的編碼轉換器 /// </summary> public static UTF8Encoding UTF8 { get; set; } = new UTF8Encoding(false); public MQConnection(MQConfig config, string vhost) { this.config = config; this.vhost = vhost; } public IConnection Connection { get { if (connection == null) { ConnectionFactory factory = new ConnectionFactory { AutomaticRecoveryEnabled = true, UserName = this.config.UserName, Password = this.config.Password, HostName = this.config.HostName, VirtualHost = this.vhost, Port = this.config.Port }; connection = factory.CreateConnection(); } return connection; } } }
設想一下,有這樣的一個業務場景,通道管理和服務管理都是相同的操做,若是這些基礎操做都在一個地方定義,且有一個默認的實現,那麼後來者就不須要去關注這些技術細節,直接繼承基礎類後,傳入相應的消息配置便可完成
消息訂閱和發佈操做。網絡
public interface IService { /// <summary> /// 建立通道 /// </summary> /// <param name="queue">隊列名稱</param> /// <param name="routeKey">路由名稱</param> /// <param name="exchangeType">交換機類型</param> /// <returns></returns> MQChannel CreateChannel(string queue, string routeKey, string exchangeType); /// <summary> /// 開啓訂閱 /// </summary> void Start(); /// <summary> /// 中止訂閱 /// </summary> void Stop(); /// <summary> /// 通道列表 /// </summary> List<MQChannel> Channels { get; set; } /// <summary> /// 消息隊列中定義的虛擬機 /// </summary> string vHost { get; } /// <summary> /// 消息隊列中定義的交換機 /// </summary> string Exchange { get; } }
public abstract class MQServiceBase : IService { internal bool started = false; internal MQServiceBase(MQConfig config) { this.Config = config; } public MQChannel CreateChannel(string queue, string routeKey, string exchangeType) { MQConnection conn = new MQConnection(this.Config, this.vHost); MQChannelManager cm = new MQChannelManager(conn); MQChannel channel = cm.CreateReceiveChannel(exchangeType, this.Exchange, queue, routeKey); return channel; } /// <summary> /// 啓動訂閱 /// </summary> public void Start() { if (started) { return; } MQConnection conn = new MQConnection(this.Config, this.vHost); MQChannelManager manager = new MQChannelManager(conn); foreach (var item in this.Queues) { MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey); channel.OnReceivedCallback = item.OnReceived; this.Channels.Add(channel); } started = true; } /// <summary> /// 中止訂閱 /// </summary> public void Stop() { foreach (var c in this.Channels) { c.Stop(); } this.Channels.Clear(); started = false; } /// <summary> /// 接收消息 /// </summary> /// <param name="message"></param> public abstract void OnReceived(MessageBody message); public List<MQChannel> Channels { get; set; } = new List<MQChannel>(); /// <summary> /// 消息隊列配置 /// </summary> public MQConfig Config { get; set; } /// <summary> /// 消息隊列中定義的虛擬機 /// </summary> public abstract string vHost { get; } /// <summary> /// 消息隊列中定義的交換機 /// </summary> public abstract string Exchange { get; } /// <summary> /// 定義的隊列列表 /// </summary> public List<QueueInfo> Queues { get; } = new List<QueueInfo>(); }
上面的抽象類,原封不動的實現接口契約,代碼很是簡單,在 Start 方法中,建立通道和啓動消息訂閱;同時,將通道加入屬性 Channels 中,方便後面的自檢服務使用;在 Start 方法中ide
/// <summary> /// 啓動訂閱 /// </summary> public void Start() { if (started) { return; } MQConnection conn = new MQConnection(this.Config, this.vHost); MQChannelManager manager = new MQChannelManager(conn); foreach (var item in this.Queues) { MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey); channel.OnReceivedCallback = item.OnReceived; this.Channels.Add(channel); } started = true; }
使用 MQChannelManager 建立了一個通道,並將通道的回調委託 OnReceivedCallback 設置爲 item.OnReceived 方法,該方法將有子類實現;在將當前訂閱服務通道建立完成後,標記服務狀態 started 爲 true,防止重複啓動;同時,在該抽象類中,不實現契約的 OnReceived(MessageBody message);強制基礎業務服務類去自我實現,由於各類業務的特殊性,這塊對消息的處理不能再基礎服務中完成函數
接下來要介紹的是服務監控管理類,該類內部定義一個簡單的定時器功能,不間斷的對 RabbitMQ 的通信進行偵聽,一旦發現有斷開的鏈接,就自動建立一個新的通道,並移除舊的通道;同時,提供 Start/Stop 兩個方法,以供程序 啓動/中止 的時候對測試
public class MQServcieManager { public int Timer_tick { get; set; } = 10 * 1000; private Timer timer = null; public Action<MessageLevel, string, Exception> OnAction = null; public MQServcieManager() { timer = new Timer(OnInterval, "", Timer_tick, Timer_tick); } /// <summary> /// 自檢,配合 RabbitMQ 內部自動重連機制 /// </summary> /// <param name="sender"></param> private void OnInterval(object sender) { int error = 0, reconnect = 0; OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 正在執行自檢", null); foreach (var item in this.Services) { for (int i = 0; i < item.Channels.Count; i++) { var c = item.Channels[i]; if (c.Connection == null || !c.Connection.IsOpen) { error++; OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 從新建立訂閱", null); try { c.Stop(); var channel = item.CreateChannel(c.QueueName, c.RoutekeyName, c.ExchangeTypeName); item.Channels.Remove(c); item.Channels.Add(channel); OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 從新建立完成", null); reconnect++; } catch (Exception ex) { OnAction?.Invoke(MessageLevel.Information, ex.Message, ex); } } } } OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 自檢完成,錯誤數:{error},重連成功數:{reconnect}", null); } public void Start() { foreach (var item in this.Services) { try { item.Start(); } catch (Exception e) { OnAction?.Invoke(MessageLevel.Error, $"啓動服務出錯 | {e.Message}", e); } } } public void Stop() { try { foreach (var item in this.Services) { item.Stop(); } Services.Clear(); timer.Dispose(); } catch (Exception e) { OnAction?.Invoke(MessageLevel.Error, $"中止服務出錯 | {e.Message}", e); } } public void AddService(IService service) { Services.Add(service); } public List<IService> Services { get; set; } = new List<IService>(); }
代碼比較簡單,就不在一一介紹,爲了將異常等內部信息傳遞到外邊,方便使用第三方組件進行日誌記錄等需求,MQServcieManager 還使用了 MessageLevel 這個定義,方便業務根據不一樣的消息級別對消息進行處理this
public enum MessageLevel { Trace = 0, Debug = 1, Information = 2, Warning = 3, Error = 4, Critical = 5, None = 6 }
終於來到了這一步,咱們將要開始使用這個基礎服務;首先,建立一個 DemoService 繼承自 MQServiceBase ;同時,編碼
public class DemoService : MQServiceBase { public Action<MessageLevel, string, Exception> OnAction = null; public DemoService(MQConfig config) : base(config) { base.Queues.Add(new QueueInfo() { ExchangeType = ExchangeType.Direct, Queue = "login-message", RouterKey = "pk", OnReceived = this.OnReceived }); } public override string vHost { get { return "gpush"; } } public override string Exchange { get { return "user"; } } /// <summary> /// 接收消息 /// </summary> /// <param name="message"></param> public override void OnReceived(MessageBody message) { try { Console.WriteLine(message.Content); } catch (Exception ex) { OnAction?.Invoke(MessageLevel.Error, ex.Message, ex); } message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, true); } }
以上的代碼很是簡單,幾乎不須要業務開發者作更多的其它工做,開發者只須要在構造方法內部傳入一個 QueueInfo 對象,若是有多個,可一併傳入.net
public partial class QueueInfo { /// <summary> /// 隊列名稱 /// </summary> public string Queue { get; set; } /// <summary> /// 路由名稱 /// </summary> public string RouterKey { get; set; } /// <summary> /// 交換機類型 /// </summary> public string ExchangeType { get; set; } /// <summary> /// 接受消息委託 /// </summary> public Action<MessageBody> OnReceived { get; set; } /// <summary> /// 輸出信息到客戶端 /// </summary> public Action<MQChannel, MessageLevel, string> OnAction { get; set; } }
並設置 vHost 和 Exchange 的值,而後剩下的就是在 OnReceived(MessageBody message) 方法中專心的處理本身的業務了;在這裏,咱們僅輸出接收到的消息,並設置 ack 爲已成功處理。
class Program { static void Main(string[] args) { Test(); } static void Test() { MQConfig config = new MQConfig() { HostName = "127.0.0.1", Password = "123456", Port = 5672, UserName = "dotnet" }; MQServcieManager manager = new MQServcieManager(); manager.AddService(new DemoService(config)); manager.OnAction = OnActionOutput; manager.Start(); Console.WriteLine("服務已啓動"); Console.ReadKey(); manager.Stop(); Console.WriteLine("服務已中止,按任意鍵退出..."); Console.ReadKey(); } static void OnActionOutput(MessageLevel level, string message, Exception ex) { Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine("{0} | {1} | {2}", level, message, ex?.StackTrace); Console.ForegroundColor = ConsoleColor.Gray; } }
消息已經接收並處理,爲了查看監控效果,我還手動將網絡進行中斷,而後監控服務檢測到沒法鏈接,嘗試重建通道,並將消息輸出
在文章中,咱們創建了 RabbitMQ 的通道管理、基礎服務管理、契約實現等操做,讓業務開發人員經過簡單的繼承實現去快速的處理業務系統的邏輯,後續若是有增長消費者的狀況下,只須要經過 MQServcieManager.AddService 進行簡單的調用操做便可,無需對底層技術細節進行過多的改動。
源碼下載:
https://github.com/lianggx/EasyAspNetCoreDemo/tree/master/Ron.MQTest