.net平臺的rabbitmq使用封裝

.net平臺的rabbitmq使用封裝

 

前言

  RabbitMq你們再熟悉不過,這篇文章主要整對rabbitmq學習後封裝RabbitMQ.Client的一個分享。文章最後,我會把封裝組件和demo奉上。html

Rabbitmq的關鍵術語

  一、綁定器(Binding):根據路由規則綁定Queue和Exchange。git

  二、路由鍵(Routing Key):Exchange根據關鍵字進行消息投遞。github

  三、交換機(Exchange):指定消息按照路由規則進入指定隊列網絡

  四、消息隊列(Queue):消息的存儲載體框架

  五、生產者(Producer):消息發佈者。ide

  六、消費者(Consumer):消息接收者。post

Rabbitmq的運做

  從下圖能夠看出,發佈者(Publisher)是把消息先發送到交換器(Exchange),再從交換器發送到指定隊列(Queue),而先前已經聲明交換器與隊列綁定關係,最後消費者(Customer)經過訂閱或者主動取指定隊列消息進行消費。學習

  那麼剛剛提到的訂閱和主動取能夠理解成,推(被動),拉(主動)。測試

  推,只要隊列增長一條消息,就會通知空閒的消費者進行消費。(我不找你,就等你找我,觀察者模式)ui

  拉,不會通知消費者,而是由消費者主動輪循或者定時去取隊列消息。(我須要纔去找你)

  使用場景我舉個例子,假若有兩套系統 訂單系統和發貨系統,從訂單系統發起發貨消息指令,爲了及時發貨,發貨系統須要訂閱隊列,只要有指令就處理。

  但是程序偶爾會出異常,例如網絡或者DB超時了,把消息丟到失敗隊列,這個時候須要重發機制。可是我又不想while(IsPostSuccess == True),由於只要出異常了,會在某個時間段內都會有異常,這樣的重試是沒意義的。

  這個時候不須要及時的去處理消息,有個JOB定時或者每隔幾分鐘(失敗次數*間隔分鐘)去取失敗隊列消息,進行重發。

Publish(發佈)的封裝

  步驟:初始化連接->聲明交換器->聲明隊列->換機器與隊列綁定->發佈消息。注意的是,我將Model存到了ConcurrentDictionary裏面,由於聲明與綁定是很是耗時的,其次,往重複的隊列發送消息是不須要從新初始化的。

複製代碼
 1         /// <summary>
 2         /// 交換器聲明
 3         /// </summary>
 4         /// <param name="iModel"></param>
 5         /// <param name="exchange">交換器</param>
 6         /// <param name="type">交換器類型:
 7         /// 一、Direct Exchange – 處理路由鍵。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底
 8         /// 匹配。這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵 「dog」,則只有被標記爲「dog」的
 9         /// 消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog
10         /// 二、Fanout Exchange – 不處理路由鍵。你只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都
11         /// 會被轉發到與該交換機綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。Fanout
12         /// 交換機轉發消息是最快的。
13         /// 三、Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多
14         /// 個詞,符號「*」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」
15         /// 只會匹配到「audit.irs」。</param>
16         /// <param name="durable">持久化</param>
17         /// <param name="autoDelete">自動刪除</param>
18         /// <param name="arguments">參數</param>
19         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,
20             bool durable = true,
21             bool autoDelete = false, IDictionary<string, object> arguments = null)
22         {
23             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();
24             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
25         }
26 
27         /// <summary>
28         /// 隊列聲明
29         /// </summary>
30         /// <param name="channel"></param>
31         /// <param name="queue">隊列</param>
32         /// <param name="durable">持久化</param>
33         /// <param name="exclusive">排他隊列,若是一個隊列被聲明爲排他隊列,該隊列僅對首次聲明它的鏈接可見,
34         /// 並在鏈接斷開時自動刪除。這裏須要注意三點:其一,排他隊列是基於鏈接可見的,同一鏈接的不一樣信道是可
35         /// 以同時訪問同一個鏈接建立的排他隊列的。其二,「首次」,若是一個鏈接已經聲明瞭一個排他隊列,其餘連
36         /// 接是不容許創建同名的排他隊列的,這個與普通隊列不一樣。其三,即便該隊列是持久化的,一旦鏈接關閉或者
37         /// 客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。</param>
38         /// <param name="autoDelete">自動刪除</param>
39         /// <param name="arguments">參數</param>
40         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
41             bool autoDelete = false, IDictionary<string, object> arguments = null)
42         {
43             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();
44             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
45         }
46 
47         /// <summary>
48         /// 獲取Model
49         /// </summary>
50         /// <param name="exchange">交換機名稱</param>
51         /// <param name="queue">隊列名稱</param>
52         /// <param name="routingKey"></param>
53         /// <param name="isProperties">是否持久化</param>
54         /// <returns></returns>
55         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
56         {
57             return ModelDic.GetOrAdd(queue, key =>
58             {
59                 var model = _conn.CreateModel();
60                 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
61                 QueueDeclare(model, queue, isProperties);
62                 model.QueueBind(queue, exchange, routingKey);
63                 ModelDic[queue] = model;
64                 return model;
65             });
66         }
67 
68         /// <summary>
69         /// 發佈消息
70         /// </summary>
71         /// <param name="routingKey">路由鍵</param>
72         /// <param name="body">隊列信息</param>
73         /// <param name="exchange">交換機名稱</param>
74         /// <param name="queue">隊列名</param>
75         /// <param name="isProperties">是否持久化</param>
76         /// <returns></returns>
77         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
78         {
79             var channel = GetModel(exchange, queue, routingKey, isProperties);
80 
81             try
82             {
83                 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());
84             }
85             catch (Exception ex)
86             {
87                 throw ex.GetInnestException();
88             }
89         }        
複製代碼

  下次是本機測試的發佈速度截圖:

  4.2W/S屬於穩定速度,把反序列化(ToJson)會稍微快一些。

 

Subscribe(訂閱)的封裝

  發佈的時候是申明瞭交換器和隊列並綁定,然而訂閱的時候只須要聲明隊列就可。從下面代碼能看到,捕獲到異常的時候,會把消息送到自定義的「死信隊列」裏,由另外的JOB進行定時重發,所以,finally是應答成功的。

複製代碼
        /// <summary>
        /// 獲取Model
        /// </summary>
        /// <param name="queue">隊列名稱</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {
            return ModelDic.GetOrAdd(queue, value =>
             {
                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);

                 //每次消費的消息數
                 model.BasicQos(0, 1, false);

                 ModelDic[queue] = model;

                 return model;
             });
        }    

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">隊列名稱</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消費處理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {
            //隊列聲明
            var channel = GetModel(queue, isProperties);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();
                try
                {
                    handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq");
                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }
                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }        
複製代碼

  下次是本機測試的發佈速度截圖:

  快的時候有1.9K/S,慢的時候也有1.7K/S

 

Pull(拉)的封裝

  直接上代碼:

複製代碼
        /// <summary>
        /// 獲取消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消費處理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {
            var channel = GetModel(exchange, queue, routingKey);

            var result = channel.BasicGet(queue, false);
            if (result.IsNull())
                return;

            var msg = result.Body.DeserializeUtf8().FromJson<T>();
            try
            {
                handler(msg);
            }
            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq");
            }
            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }    
複製代碼

  快的時候有1.8K/s,穩定是1.5K/S

 

Rpc(遠程調用)的封裝

  首先說明下,RabbitMq只是提供了這個RPC的功能,可是並非真正的RPC,爲何這麼說:

  一、傳統Rpc隱藏了調用細節,像調用本地方法同樣傳參、拋出異常

  二、RabbitMq的Rpc是基於消息的,消費者消費後,經過新隊列返回響應結果。

複製代碼
        /// <summary>
        /// RPC客戶端
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {
            var channel = GetModel(exchange, queue, routingKey, isProperties);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);

            try
            {
                var correlationId = Guid.NewGuid().ToString();
                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());

                var sw = Stopwatch.StartNew();
                while (true)
                {
                    var ea = consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {
                        return ea.Body.DeserializeUtf8();
                    }

                    if (sw.ElapsedMilliseconds > 30000)
                        throw new Exception("等待響應超時");
                }
            }
            catch (Exception ex)
            {
                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服務端
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {
            //隊列聲明
            var channel = GetModel(queue, isProperties);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();

                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    msg = handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq");
                }
                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }
複製代碼

   能夠用,但不建議去用。能夠考慮其餘的RPC框架。grpc、thrift等。

 結尾

  本篇文章,沒有過多的寫RabbitMq的知識點,由於園子的學習筆記實在太多了。下面把個人代碼奉上 https://github.com/SkyChenSky/RabbitMq 。若是有發現寫得不對的地方麻煩在評論指出,我會及時修改以避免誤導別人。

相關文章
相關標籤/搜索