消息隊列做爲分佈式系統中的重要組件,經常使用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。至於各類消息隊列的優缺點比較,在這裏就不作擴展了,網上資源不少。html
更多內容可參考 消息隊列及常見消息隊列介紹。我在這裏選用的是RabbitMq。git
官網地址:http://www.rabbitmq.comgithub
安裝和配置:Windows下RabbitMq安裝及配置數據庫
RabbitMQ是一款基於AMQP(高級消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、 Erlang等。在RabbitMq中首先要弄清楚的概念是 交換機、隊列、綁定。基本的消息通信步驟就是首先定義ExChange,而後定義隊列,而後綁定交換機和隊列。json
須要明確的一點兒是,發佈者在發送消息是,並非把消息直接發送到隊列中,而是發送到Exchang,而後由交互機根據定義的消息匹配規則,在將消息發送到隊列中。api
Exchange有四種消息消息分發規則:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了。app
詳細的概念介紹推薦查看:消息隊列之RabbitMq框架
Easynetq是一個簡單易用的Rabbitmq Net客戶端。同時支持 NetFramework和NetCore。GitHub地址。它是針對RabbitMq Net客戶端的進一步封裝。關於EasyNetQ的簡單使用推薦教程:EasyNetQ的介紹。異步
本文主要介紹基於EasyNeq的高級API的使用。EasyNetQ的做者在覈心的IBus接口中儘可能避免暴露AMQP中的交換機、隊列、綁定這些概念,使用者即便不去了解這些概念,也能完成消息的發送接收。這至關簡潔,但某些狀況下,基於應用場景的須要,咱們須要自定義交換機、隊列、綁定這些信息,EasyNetQ容許你這麼作,這些都是經過IAdvanceBus接口實現。async
這裏爲了演示,首先新建一個項目,包括一個發佈者,兩個接收者,一個公共的類庫
安裝EasyNetQ: NuGet>Install-Package EasyNetQ
在Common項目裏面是針對Easynetq的使用封裝,主要目錄以下
在RabbitMq文件夾下,是針對消息發送接收的簡單封裝。
首先來看下RabbitMqManage,主要的發送和訂閱操做都在這個類中。其中ISend接口定義了發送消息的規範,SendMessageManage是ISend的實現。IMessageConsume接口定義訂閱規範。
MesArg 和PushMsg分別是訂閱和發送需用到的參數類。RabbitMQManage是暴露在外的操做類。
首先看發送的代碼
public enum SendEnum { 訂閱模式 = 1, 推送模式 = 2, 主題路由模式 = 3 } public class PushMsg { /// <summary> /// 發送的數據 /// </summary> public object sendMsg { get; set; } /// <summary> /// 消息推送的模式 /// 如今支持:訂閱模式,推送模式,主題路由模式 /// </summary> public SendEnum sendEnum { get; set; } /// <summary> /// 管道名稱 /// </summary> public string exchangeName { get; set; } /// <summary> /// 路由名稱 /// </summary> public string routeName { get; set; } } internal interface ISend { Task SendMsgAsync(PushMsg pushMsg, IBus bus); void SendMsg(PushMsg pushMsg, IBus bus); } internal class SendMessageMange : ISend { public async Task SendMsgAsync(PushMsg pushMsg, IBus bus) { //一對一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判斷推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.訂閱模式) { //廣播訂閱模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主題路由模式) { //主題路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } await bus.Advanced.PublishAsync(ex, pushMsg.routeName.ToSafeString(""), false, message) .ContinueWith(task => { if (!task.IsCompleted && task.IsFaulted)//消息投遞失敗 { //記錄投遞失敗的消息信息 } }); } public void SendMsg(PushMsg pushMsg, IBus bus) { //一對一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判斷推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.訂閱模式) { //廣播訂閱模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主題路由模式) { //主題路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } bus.Advanced.Publish(ex, pushMsg.routeName.ToSafeString(""), false, message); } }
在EasyNetQ中對於異步發送消息的時候,消息是否送達Broker只須要查看異步發送方法最終執行成功仍是失敗,成功就表示消息送達,若是失敗能夠將失敗後的消息存入數據庫中,而後用後臺線程輪詢
數據庫表,將失敗後的消息進行從新 發送。這種方式還能夠進一步變成消息表,就是先將要發送的消息存入消息表中,而後後臺線程輪詢消息表來進行消息發送。通常這種方式被普遍用於分佈式事務中,
將本地數據庫操做和消息表寫入放入同一個本地事務中,來保證消息發送和本地數據操做的同步成功,由於個人系統中,分佈式事務的涉及不多,因此就沒這樣去作,只是簡單的在異步發送的時候監控下
是否發送失敗,而後針對失敗的消息作一個從新發送的機制。這裏,推薦大佬的NetCore分佈式事務解決方案 CAP GitHub地址。
接着看一下消息訂閱接收涉及的代碼
public class MesArgs { /// <summary> /// 消息推送的模式 /// 如今支持:訂閱模式,推送模式,主題路由模式 /// </summary> public SendEnum sendEnum { get; set; } /// <summary> /// 管道名稱 /// </summary> public string exchangeName { get; set; } /// <summary> /// 對列名稱 /// </summary> public string rabbitQueeName { get; set; } /// <summary> /// 路由名稱 /// </summary> public string routeName { get; set; } } public interface IMessageConsume { void Consume(string message); }
在訂閱中我定義了一個接口,最終業務代碼中,全部的消息訂閱類,都須要繼續此接口
最後,咱們來看下對外使用的操做類
public class RabbitMQManage { private volatile static IBus bus = null; private static readonly object lockHelper = new object(); /// <summary> /// 建立服務總線 /// </summary> /// <param name="config"></param> /// <returns></returns> public static IBus CreateEventBus() { //獲取RabbitMq的鏈接地址 //SystemJsonConfigManage 是我簡單封裝的一個json操做類,用於針對json文件的讀寫操做 var config = SystemJsonConfigManage.GetInstance().AppSettings["MeessageService"]; if (string.IsNullOrEmpty(config)) throw new Exception("消息地址未配置"); if (bus == null && !string.IsNullOrEmpty(config)) { lock (lockHelper) { if (bus == null) bus = RabbitHutch.CreateBus(config); } } return bus; } /// <summary> /// 釋放服務總線 /// </summary> public static void DisposeBus() { bus?.Dispose(); } /// <summary> /// 消息同步投遞 /// </summary> /// <param name="pushMsg"></param> /// <returns></returns> public static bool PushMessage(PushMsg pushMsg) { bool b = true; try { if (bus == null) CreateEventBus(); new SendMessageMange().SendMsg(pushMsg, bus); b = true; } catch (Exception ex) { b = false; } return b; } /// <summary> /// 消息異步投遞 /// </summary> /// <param name="pushMsg"></param> public static async Task PushMessageAsync(PushMsg pushMsg) { try { if (bus == null) CreateEventBus(); await new SendMessageMange().SendMsgAsync(pushMsg, bus); } catch (Exception ex) { throw ex; } } /// <summary> /// 消息訂閱 /// </summary> public static void Subscribe<TConsum>(MesArgs args) where TConsum : IMessageConsume,new() { if (bus == null) CreateEventBus(); if (string.IsNullOrEmpty(args.exchangeName)) return; Expression<Action<TConsum>> methodCall; IExchange ex = null; //判斷推送模式 if (args.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Direct); } if (args.sendEnum == SendEnum.訂閱模式) { //廣播訂閱模式 ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Fanout); } if (args.sendEnum == SendEnum.主題路由模式) { //主題路由模式 ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Topic); } IQueue qu; if (string.IsNullOrEmpty(args.rabbitQueeName)) { qu = bus.Advanced.QueueDeclare(); } else qu = bus.Advanced.QueueDeclare(args.rabbitQueeName); bus.Advanced.Bind(ex, qu, args.routeName.ToSafeString("")); bus.Advanced.Consume(qu, (body, properties, info) => Task.Factory.StartNew(() => { try { lock (lockHelper) { var message = Encoding.UTF8.GetString(body); //處理消息 methodCall = job => job.Consume(message); methodCall.Compile()(new TConsum()); } } catch (Exception e) { throw e; } })); } }
這裏面主要封裝了消息的發送和訂閱,以及IBus單例的建立。在後續的消息發送和訂閱主要就經過此處來實現。咱們看到一開始的類目結構中還有一個RaExMessageHandleJob類,這個類就是一個後臺
循環任務,用來監測數據庫中是否保存了發送失敗的消息,若是有,則將消息取出,嘗試從新發送。在此就不作多的介紹,你們能夠根據本身的實際需求來實現。
如今來看一下消息發佈者的代碼
主要的發送代碼都在Send類中,其中appsettings.json裏面配置了Rabbitmq的鏈接地址,TestDto只是一個爲了方便演示的參數類。
下面看一下Program裏面的代碼
很簡單的一個發送消息調用。
而後來看一下Send類中的代碼
public class Send { /// <summary> /// 發送消息 /// </summary> public static void SendMessage() { //須要注意一點兒,若是發送的時候,在該管道下找不到相匹配的隊列框架將默認丟棄該消息 //推送模式 //推送模式下,需指定管道名稱和路由鍵值名稱 //消息只會被髮送到和指定路由鍵值徹底匹配的隊列中 var directdto = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是推送模式" }, exchangeName = "message.directdemo", routeName= "routekey", sendEnum =SendEnum.推送模式 }; //同步發送 ,返回true或fasle true 發送成功,消息已存儲到Rabbitmq中,false表示發送失敗 var b= RabbitMQManage.PushMessage(directdto); //異步發送,若是失敗,失敗的消息會被寫入數據庫,會有後臺線程輪詢數據庫進行從新發送 //RabbitMQManage.PushMessageAsync(directlist); //訂閱模式 //訂閱模式只須要指定管道名稱 //消息會被髮送到該管道下的全部隊列中 var fanoutdto = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是訂閱模式" }, exchangeName = "message.fanoutdemo", sendEnum = SendEnum.訂閱模式 }; //同步發送 var fb = RabbitMQManage.PushMessage(fanoutdto); //異步發送 //RabbitMQManage.PushMessageAsync(fanoutdto); //主題路由模式 //路由模式下需指定 管道名稱和路由值 //消息會被髮送到該管道下,和路由值匹配的隊列中去 var routedto = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是主題路由模式1", }, exchangeName = "message.topicdemo", routeName="a.log", sendEnum=SendEnum.主題路由模式 }; var routedto2 = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是主題路由模式2", }, exchangeName = "message.topicdemo", routeName = "a.log.a.b", sendEnum = SendEnum.主題路由模式 }; //同步發送 var rb = RabbitMQManage.PushMessage(routedto); var rb2 = RabbitMQManage.PushMessage(routedto2); //異步發送 //RabbitMQManage.PushMessageAsync(routedto); } }
首先來看下消費者端的目錄結構
其中appsettings.json中配置Rabbitmq的鏈接信息,Program中只是簡單調用消息訂閱
主要的消息訂閱代碼都在MessageManage文件夾下,MessageManService用於定義消息訂閱類型
public class MessageManService { public static void Subsribe() { Task.Run(() => { //概念 一個管道下面能夠綁定多個隊列。 //發送消息 是指將消息發送到管道中,而後由rabbitmq根據發送規則在將消息具體的轉發到對應到管道下面的隊列中 //消費消息 是指消費者(即服務)從管道下面的隊列中獲取消息 //同一個隊列 能夠有多個消費者(即不一樣的服務,均可以鏈接到同一個隊列去獲取消息) //但注意 當一個隊列有多個消費者的時候,消息會被依次分發到不一樣的消費者中。好比第一條消息給第一個消費者,第二條消息給第二個消費者(框架內部有一個公平分發的機制) //推送模式時 需指定管道名稱和路由值 //隊列名稱可本身指定 //注意 ,管道名稱和路由名稱必定要和發送方的管道名稱和路由名稱一致 //不管這個管道下面掛靠有多少個隊列,只有路由名稱和此處指定的路由名稱徹底一致的隊列,纔會收到這條消息。 var dirarg = new MesArgs() { sendEnum = SendEnum.推送模式, exchangeName = "message.directdemo", rabbitQueeName = "meesage.directmessagequene", routeName = "routekey" }; RabbitMQManage.Subscribe<DirectMessageConsume>(dirarg); //訂閱模式時需指定管道名稱,而且管道名稱要和發送方管道名稱一致 //隊列名稱可本身指定 //全部這個管道下面的隊列,都將收到該條消息 var fanoutrg = new MesArgs() { sendEnum = SendEnum.訂閱模式, exchangeName = "message.fanoutdemo", rabbitQueeName = "meesage.fanoutmessagequene" }; RabbitMQManage.Subscribe<FanoutMessageConsume>(fanoutrg); //路由模式時需指定管道名稱,路由關鍵字而且管道名稱,路由關鍵字要和發送方的一致 //隊列名稱可本身指定 //消息將被髮送到管道下面的能匹配路由關鍵字的隊列中 //也就是說 路由模式時,有多少隊列能收到消息,取決於該隊列的路由關鍵字是否匹配,只要匹配就能收到消息 //符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞 var topicrg = new MesArgs() { sendEnum = SendEnum.主題路由模式, exchangeName = "message.topicdemo", rabbitQueeName = "message.topicmessagequene", routeName = "#.log.#" }; RabbitMQManage.Subscribe<TopicMessageConsume>(topicrg); }); } }
Consume文件夾下主要定義了消息的業務處理
//推送模式過來的消息 public class DirectMessageConsume : IMessageConsume { //消息的處理方法中最好不要進行try catch操做 //若是發送異常,EasyNetQ會自動將消息放入錯誤隊列中 //若是在Consume方法體中捕獲了異常而且沒有拋出,會默認消息處理成功 //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次 //(好比說第一次正在處理消息的時候服務掛掉,服務重啓後這條消息又會從新推送過來) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } } //廣播模式過來的消息 public class FanoutMessageConsume : IMessageConsume { //消息的處理方法中最好不要進行try catch操做 //若是發送異常,EasyNetQ會自動將消息放入錯誤隊列中 //若是在Consume方法體中捕獲了異常而且沒有拋出,會默認消息處理成功 //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次 //(好比說第一次正在處理消息的時候服務掛掉,服務重啓後這條消息又會從新推送過來) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } } //主題路由模式過來的消息 public class TopicMessageConsume : IMessageConsume { //消息的處理方法中最好不要進行try catch操做 //若是發送異常,EasyNetQ會自動將消息放入錯誤隊列中 //若是在Consume方法體中捕獲了異常而且沒有拋出,會默認消息處理成功 //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次 //(好比說第一次正在處理消息的時候服務掛掉,服務重啓後這條消息又會從新推送過來) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } }
能夠看到,全部的類都集成自咱們定義的接口IMessageConsume。
在EasyNetQ中若是須要消費者確認功能,則須要在Rabbitmq的鏈接配置中設置publisherConfirms=true,這將會開啓自動確認。在使用高級api定義交換機和隊列時能夠本身定義多種參數,好比消息是否持久化,消息最大長度等等,具體你們能夠去看官方文檔,上面有詳細介紹。Easynetq會自動去捕獲消費異常的消息並將其放入到錯誤隊列中,並且官方提供了從新發送錯誤隊列中消息的方法,固然你也能夠本身去監視錯誤列隊,對異常消息進行處理。EasyNetQ裏面做者針對消息的發佈確認和消費確認都作了封裝。在EasyNetQ中發佈消息的時候若是選用的同步發送,只要沒有拋出異常,咱們就能夠認爲任務消息已經正確到達Broker,而異步發送的話須要咱們本身去監視Task是否成功 。若是開啓了自動確認,並不須要咱們在消息處理的方法體中手動返回ack信息,只要消息被 正確處理就會自動ack。雖然RabbitMq中也有事務消息,但因爲性能比較差,並不推薦使用。其實,只要咱們能明確消息是否發佈成功和消費成功,就將會很容易在這個基礎上擴展出分佈式事務的處理。