NetCore基於EasyNetQ的高級API使用RabbitMq

1、消息隊列

 消息隊列做爲分佈式系統中的重要組件,經常使用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。至於各類消息隊列的優缺點比較,在這裏就不作擴展了,網上資源不少。html

 更多內容可參考 消息隊列及常見消息隊列介紹。我在這裏選用的是RabbitMq。git

官網地址:http://www.rabbitmq.comgithub

安裝和配置:Windows下RabbitMq安裝及配置數據庫

2、RabbitMq簡單介紹 

 RabbitMQ是一款基於AMQP(高級消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、   Erlang等。在RabbitMq中首先要弄清楚的概念是 交換機、隊列、綁定。基本的消息通信步驟就是首先定義ExChange,而後定義隊列,而後綁定交換機和隊列。json

 須要明確的一點兒是,發佈者在發送消息是,並非把消息直接發送到隊列中,而是發送到Exchang,而後由交互機根據定義的消息匹配規則,在將消息發送到隊列中。api

 Exchange有四種消息消息分發規則:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了。app

 詳細的概念介紹推薦查看:消息隊列之RabbitMq框架

3、EasyNetQ使用

 Easynetq是一個簡單易用的Rabbitmq Net客戶端。同時支持 NetFramework和NetCore。GitHub地址。它是針對RabbitMq Net客戶端的進一步封裝。關於EasyNetQ的簡單使用推薦教程:EasyNetQ的介紹異步

 本文主要介紹基於EasyNeq的高級API的使用。EasyNetQ的做者在覈心的IBus接口中儘可能避免暴露AMQP中的交換機、隊列、綁定這些概念,使用者即便不去了解這些概念,也能完成消息的發送接收。這至關簡潔,但某些狀況下,基於應用場景的須要,咱們須要自定義交換機、隊列、綁定這些信息,EasyNetQ容許你這麼作,這些都是經過IAdvanceBus接口實現。async

3.1 項目裝備

 這裏爲了演示,首先新建一個項目,包括一個發佈者,兩個接收者,一個公共的類庫

安裝EasyNetQ: NuGet>Install-Package EasyNetQ

3.2 簡單封裝

在Common項目裏面是針對Easynetq的使用封裝,主要目錄以下

 

 在RabbitMq文件夾下,是針對消息發送接收的簡單封裝。

 首先來看下RabbitMqManage,主要的發送和訂閱操做都在這個類中。其中ISend接口定義了發送消息的規範,SendMessageManage是ISend的實現。IMessageConsume接口定義訂閱規範。

 MesArg 和PushMsg分別是訂閱和發送需用到的參數類。RabbitMQManage是暴露在外的操做類。

 首先看發送的代碼

 
 public enum SendEnum
    {
        訂閱模式 = 1,
        推送模式 = 2,
        主題路由模式 = 3
    }
    public class PushMsg
    {
        /// <summary>
        /// 發送的數據
        /// </summary>
        public object sendMsg { get; set; }

        /// <summary>
        /// 消息推送的模式
        /// 如今支持:訂閱模式,推送模式,主題路由模式
        /// </summary>
        public SendEnum sendEnum { get; set; }

        /// <summary>
        /// 管道名稱
        /// </summary>
        public string exchangeName { get; set; }

        /// <summary>
        /// 路由名稱
        /// </summary>
        public string routeName { get; set; }
    }


internal interface ISend
    {
        Task SendMsgAsync(PushMsg pushMsg, IBus bus);

       void  SendMsg(PushMsg pushMsg, IBus bus);
    }

 internal class SendMessageMange : ISend
    {
        public async Task SendMsgAsync(PushMsg pushMsg, IBus bus)
        {
            //一對一推送

            var message = new Message<object>(pushMsg.sendMsg);
            IExchange ex = null;
            //判斷推送模式
            if (pushMsg.sendEnum == SendEnum.推送模式)
            {
                ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct);
            }
            if (pushMsg.sendEnum == SendEnum.訂閱模式)
            {
                //廣播訂閱模式
                ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout);
            }
            if (pushMsg.sendEnum == SendEnum.主題路由模式)
            {
                //主題路由模式
                ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic);
            }
           await bus.Advanced.PublishAsync(ex, pushMsg.routeName.ToSafeString(""), false, message)
            .ContinueWith(task =>
             {
                 if (!task.IsCompleted && task.IsFaulted)//消息投遞失敗
                         {
                             //記錄投遞失敗的消息信息   
                 }
             });


        }

        public void SendMsg(PushMsg pushMsg, IBus bus)
        {
            //一對一推送

                var message = new Message<object>(pushMsg.sendMsg);
                IExchange ex = null;
                //判斷推送模式
                if (pushMsg.sendEnum == SendEnum.推送模式)
                {
                    ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct);
                }
                if (pushMsg.sendEnum == SendEnum.訂閱模式)
                {
                    //廣播訂閱模式
                    ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout);
                }
                if (pushMsg.sendEnum == SendEnum.主題路由模式)
                {
                    //主題路由模式
                    ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic);
                }
                bus.Advanced.Publish(ex, pushMsg.routeName.ToSafeString(""), false, message);
                
            
        }
    
}

在EasyNetQ中對於異步發送消息的時候,消息是否送達Broker只須要查看異步發送方法最終執行成功仍是失敗,成功就表示消息送達,若是失敗能夠將失敗後的消息存入數據庫中,而後用後臺線程輪詢

數據庫表,將失敗後的消息進行從新 發送。這種方式還能夠進一步變成消息表,就是先將要發送的消息存入消息表中,而後後臺線程輪詢消息表來進行消息發送。通常這種方式被普遍用於分佈式事務中,

將本地數據庫操做和消息表寫入放入同一個本地事務中,來保證消息發送和本地數據操做的同步成功,由於個人系統中,分佈式事務的涉及不多,因此就沒這樣去作,只是簡單的在異步發送的時候監控下

是否發送失敗,而後針對失敗的消息作一個從新發送的機制。這裏,推薦大佬的NetCore分佈式事務解決方案 CAP GitHub地址

 接着看一下消息訂閱接收涉及的代碼

 

public class MesArgs
    {
        /// <summary>
        /// 消息推送的模式
        /// 如今支持:訂閱模式,推送模式,主題路由模式
        /// </summary>
        public SendEnum sendEnum { get; set; }

        /// <summary>
        /// 管道名稱
        /// </summary>
        public string exchangeName { get; set; }

        /// <summary>
        /// 對列名稱
        /// </summary>
        public string rabbitQueeName { get; set; }

        /// <summary>
        /// 路由名稱
        /// </summary>
        public string routeName { get; set; }
    }
  public interface IMessageConsume
    {
        void Consume(string message);
    }

在訂閱中我定義了一個接口,最終業務代碼中,全部的消息訂閱類,都須要繼續此接口

最後,咱們來看下對外使用的操做類

 

public  class RabbitMQManage
    {

        private volatile static IBus bus = null;

        private static readonly object lockHelper = new object();

        /// <summary>
        /// 建立服務總線
        /// </summary>
        /// <param name="config"></param>
        /// <returns></returns>
        public static IBus CreateEventBus()
        {
            //獲取RabbitMq的鏈接地址
            //SystemJsonConfigManage 是我簡單封裝的一個json操做類,用於針對json文件的讀寫操做
            var config = SystemJsonConfigManage.GetInstance().AppSettings["MeessageService"];
            if (string.IsNullOrEmpty(config))
                throw new Exception("消息地址未配置");
            if (bus == null && !string.IsNullOrEmpty(config))
            {
                lock (lockHelper)
                {
                    if (bus == null)
                        bus = RabbitHutch.CreateBus(config);
                }
            }
            return bus;
        }
        /// <summary>
        /// 釋放服務總線
        /// </summary>
        public static void DisposeBus()
        {
            bus?.Dispose();
        }
        /// <summary>
        ///  消息同步投遞
        /// </summary>
        /// <param name="pushMsg"></param>
        /// <returns></returns>
        public static bool PushMessage(PushMsg pushMsg)
        {
            bool b = true;
            try
            {
                if (bus == null)
                    CreateEventBus();
                new SendMessageMange().SendMsg(pushMsg, bus);
                b = true;
            }
            catch (Exception ex)
            {
                
                b = false;
            }
            return b;
        }
        /// <summary>
        /// 消息異步投遞
        /// </summary>
        /// <param name="pushMsg"></param>
        public static async Task PushMessageAsync(PushMsg pushMsg)
        {
            try
            {
                if (bus == null)
                    CreateEventBus();
               await new SendMessageMange().SendMsgAsync(pushMsg, bus);
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        /// <summary>
        /// 消息訂閱
        /// </summary>
        public static void Subscribe<TConsum>(MesArgs args)
            where TConsum : IMessageConsume,new()
        {
            if (bus == null)
                CreateEventBus();
            if (string.IsNullOrEmpty(args.exchangeName))
                return;
            Expression<Action<TConsum>> methodCall;
            IExchange ex = null;
            //判斷推送模式
            if (args.sendEnum == SendEnum.推送模式)
            {
                ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Direct);
                
            }
            if (args.sendEnum == SendEnum.訂閱模式)
            {
                //廣播訂閱模式
                ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Fanout);
            }
            if (args.sendEnum == SendEnum.主題路由模式)
            {
                //主題路由模式
                ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Topic);
            }
            IQueue qu;
            if (string.IsNullOrEmpty(args.rabbitQueeName))
            {
                qu = bus.Advanced.QueueDeclare();
            }
            else
                qu = bus.Advanced.QueueDeclare(args.rabbitQueeName);
            bus.Advanced.Bind(ex, qu, args.routeName.ToSafeString(""));
            bus.Advanced.Consume(qu, (body, properties, info) => Task.Factory.StartNew(() =>
            {
                try
                {
                    lock (lockHelper)
                    {

                        var message = Encoding.UTF8.GetString(body);
                        //處理消息
                        methodCall = job => job.Consume(message);
                        methodCall.Compile()(new TConsum());
                    }

                }
                catch (Exception e)
                {
                    
                    throw e;

                }


            }));
        }
    }

這裏面主要封裝了消息的發送和訂閱,以及IBus單例的建立。在後續的消息發送和訂閱主要就經過此處來實現。咱們看到一開始的類目結構中還有一個RaExMessageHandleJob類,這個類就是一個後臺

循環任務,用來監測數據庫中是否保存了發送失敗的消息,若是有,則將消息取出,嘗試從新發送。在此就不作多的介紹,你們能夠根據本身的實際需求來實現。

3.3 發佈者

 如今來看一下消息發佈者的代碼

 

 主要的發送代碼都在Send類中,其中appsettings.json裏面配置了Rabbitmq的鏈接地址,TestDto只是一個爲了方便演示的參數類。

 下面看一下Program裏面的代碼

 很簡單的一個發送消息調用。

 而後來看一下Send類中的代碼

 

public  class Send
    {
        /// <summary>
        /// 發送消息
        /// </summary>
        public static void SendMessage()
        {
            //須要注意一點兒,若是發送的時候,在該管道下找不到相匹配的隊列框架將默認丟棄該消息




            //推送模式
            //推送模式下,需指定管道名稱和路由鍵值名稱
            //消息只會被髮送到和指定路由鍵值徹底匹配的隊列中
            var directdto = new PushMsg()
            {
                sendMsg = new TestDto()
                {
                    Var1 = "這是推送模式"
                },
                exchangeName = "message.directdemo",
                routeName= "routekey",
                sendEnum =SendEnum.推送模式
            };
            //同步發送 ,返回true或fasle true 發送成功,消息已存儲到Rabbitmq中,false表示發送失敗
            var b= RabbitMQManage.PushMessage(directdto);
            //異步發送,若是失敗,失敗的消息會被寫入數據庫,會有後臺線程輪詢數據庫進行從新發送
            //RabbitMQManage.PushMessageAsync(directlist);


            //訂閱模式
            //訂閱模式只須要指定管道名稱
            //消息會被髮送到該管道下的全部隊列中
            var fanoutdto = new PushMsg()
            {
                sendMsg = new TestDto()
                {
                    Var1 = "這是訂閱模式"
                },
                exchangeName = "message.fanoutdemo",
                sendEnum = SendEnum.訂閱模式
            };
            //同步發送 
             var fb = RabbitMQManage.PushMessage(fanoutdto);
            //異步發送
            //RabbitMQManage.PushMessageAsync(fanoutdto);

            //主題路由模式
            //路由模式下需指定 管道名稱和路由值
            //消息會被髮送到該管道下,和路由值匹配的隊列中去
            var routedto = new PushMsg()
            {
                sendMsg = new TestDto()
                {
                    Var1 = "這是主題路由模式1",
                },
                exchangeName = "message.topicdemo",
                routeName="a.log",
                sendEnum=SendEnum.主題路由模式
                
            };
            var routedto2 = new PushMsg()
            {
                sendMsg = new TestDto()
                {
                    Var1 = "這是主題路由模式2",
                },
                exchangeName = "message.topicdemo",
                routeName = "a.log.a.b",
                sendEnum = SendEnum.主題路由模式

            };
            //同步發送 
            var rb = RabbitMQManage.PushMessage(routedto);
            var rb2 = RabbitMQManage.PushMessage(routedto2);
            //異步發送
            //RabbitMQManage.PushMessageAsync(routedto);
        }
    }

 

3.4 消費者

 首先來看下消費者端的目錄結構

 

其中appsettings.json中配置Rabbitmq的鏈接信息,Program中只是簡單調用消息訂閱

主要的消息訂閱代碼都在MessageManage文件夾下,MessageManService用於定義消息訂閱類型

 public class MessageManService
    {
        public static void Subsribe()
        {
            Task.Run(() =>
            {
                //概念  一個管道下面能夠綁定多個隊列。
                //發送消息 是指將消息發送到管道中,而後由rabbitmq根據發送規則在將消息具體的轉發到對應到管道下面的隊列中
                //消費消息 是指消費者(即服務)從管道下面的隊列中獲取消息
                //同一個隊列 能夠有多個消費者(即不一樣的服務,均可以鏈接到同一個隊列去獲取消息)
                //但注意 當一個隊列有多個消費者的時候,消息會被依次分發到不一樣的消費者中。好比第一條消息給第一個消費者,第二條消息給第二個消費者(框架內部有一個公平分發的機制)


                //推送模式時 需指定管道名稱和路由值
                //隊列名稱可本身指定
                //注意 ,管道名稱和路由名稱必定要和發送方的管道名稱和路由名稱一致
                //不管這個管道下面掛靠有多少個隊列,只有路由名稱和此處指定的路由名稱徹底一致的隊列,纔會收到這條消息。
                var dirarg = new MesArgs()
                {
                    sendEnum = SendEnum.推送模式,
                    exchangeName = "message.directdemo",
                    rabbitQueeName = "meesage.directmessagequene",
                    routeName = "routekey"
                };
                RabbitMQManage.Subscribe<DirectMessageConsume>(dirarg);

                //訂閱模式時需指定管道名稱,而且管道名稱要和發送方管道名稱一致
                //隊列名稱可本身指定
                //全部這個管道下面的隊列,都將收到該條消息
                var fanoutrg = new MesArgs()
                {
                    sendEnum = SendEnum.訂閱模式,
                    exchangeName = "message.fanoutdemo",
                    rabbitQueeName = "meesage.fanoutmessagequene"
                };
                RabbitMQManage.Subscribe<FanoutMessageConsume>(fanoutrg);

                //路由模式時需指定管道名稱,路由關鍵字而且管道名稱,路由關鍵字要和發送方的一致
                //隊列名稱可本身指定
                //消息將被髮送到管道下面的能匹配路由關鍵字的隊列中
                //也就是說 路由模式時,有多少隊列能收到消息,取決於該隊列的路由關鍵字是否匹配,只要匹配就能收到消息
                //符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞
                var topicrg = new MesArgs()
                {
                    sendEnum = SendEnum.主題路由模式,
                    exchangeName = "message.topicdemo",
                    rabbitQueeName = "message.topicmessagequene",
                    routeName = "#.log.#"
                };

                RabbitMQManage.Subscribe<TopicMessageConsume>(topicrg);
            });
        }
    }

Consume文件夾下主要定義了消息的業務處理

//推送模式過來的消息 
public class DirectMessageConsume : IMessageConsume
    {
        //消息的處理方法中最好不要進行try catch操做
        //若是發送異常,EasyNetQ會自動將消息放入錯誤隊列中
        //若是在Consume方法體中捕獲了異常而且沒有拋出,會默認消息處理成功
        //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次
        //(好比說第一次正在處理消息的時候服務掛掉,服務重啓後這條消息又會從新推送過來)
        public void Consume(string message)
        {
            var dto = JsonConvert.DeserializeObject<TestDto>(message);
            Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3);
        }
    }

//廣播模式過來的消息
 public class FanoutMessageConsume : IMessageConsume
    {
        //消息的處理方法中最好不要進行try catch操做
        //若是發送異常,EasyNetQ會自動將消息放入錯誤隊列中
        //若是在Consume方法體中捕獲了異常而且沒有拋出,會默認消息處理成功
        //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次
        //(好比說第一次正在處理消息的時候服務掛掉,服務重啓後這條消息又會從新推送過來)
        public void Consume(string message)
        {
            var dto = JsonConvert.DeserializeObject<TestDto>(message);
            Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3);
        }
    }

//主題路由模式過來的消息
 public class TopicMessageConsume : IMessageConsume
    {
        //消息的處理方法中最好不要進行try catch操做
        //若是發送異常,EasyNetQ會自動將消息放入錯誤隊列中
        //若是在Consume方法體中捕獲了異常而且沒有拋出,會默認消息處理成功
        //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次
        //(好比說第一次正在處理消息的時候服務掛掉,服務重啓後這條消息又會從新推送過來)
        public void Consume(string message)
        {
            var dto = JsonConvert.DeserializeObject<TestDto>(message);
            Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3);
        }
    }

能夠看到,全部的類都集成自咱們定義的接口IMessageConsume。

4、總結

在EasyNetQ中若是須要消費者確認功能,則須要在Rabbitmq的鏈接配置中設置publisherConfirms=true,這將會開啓自動確認。在使用高級api定義交換機和隊列時能夠本身定義多種參數,好比消息是否持久化,消息最大長度等等,具體你們能夠去看官方文檔,上面有詳細介紹。Easynetq會自動去捕獲消費異常的消息並將其放入到錯誤隊列中,並且官方提供了從新發送錯誤隊列中消息的方法,固然你也能夠本身去監視錯誤列隊,對異常消息進行處理。EasyNetQ裏面做者針對消息的發佈確認和消費確認都作了封裝。在EasyNetQ中發佈消息的時候若是選用的同步發送,只要沒有拋出異常,咱們就能夠認爲任務消息已經正確到達Broker,而異步發送的話須要咱們本身去監視Task是否成功 。若是開啓了自動確認,並不須要咱們在消息處理的方法體中手動返回ack信息,只要消息被 正確處理就會自動ack。雖然RabbitMq中也有事務消息,但因爲性能比較差,並不推薦使用。其實,只要咱們能明確消息是否發佈成功和消費成功,就將會很容易在這個基礎上擴展出分佈式事務的處理。

相關文章
相關標籤/搜索