基礎拾遺----RabbitMQ

基礎拾遺

基礎拾遺------特性詳解html

基礎拾遺------webservice詳解java

基礎拾遺------redis詳解git

基礎拾遺------反射詳解github

基礎拾遺------委託詳解web

基礎拾遺------接口詳解redis

基礎拾遺------泛型詳解sql

基礎拾遺-----依賴注入數據庫

基礎拾遺-----數據註解與驗證c#

基礎拾遺-----mongoDB操做安全

基礎拾遺----RabbitMQ

前言

  消息隊列,在高併發環境下,因爲來不及同步處理,請求每每會發生堵塞,好比說雙十一不少人進行下單,購買物品這是對於數據的操做是很是之大的,不論是是insert仍是update是否是都有及時操做數據庫,那麼就有可能形成數據庫思索移除什麼堆積阻塞。那麼咱們這時是否是加入異步,nosql是否是能減輕其壓力,那麼這中間劍氣的橋樑就是mq了,固然她的使用場景有不少,咱們接下來把社麼是消息隊列了解清楚它是怎麼一回事以後,但願你們能在本身的項目中靈活應用便可。

消息隊列(MQ)

咱們先從圖文上說一下它的使用場景,異步處理,應用解耦,流量削鋒消息通信四個場景。由於之前開發過商城因此就如下載訂單來敘述一下,他的適應場景吧。

異步處理

好比咱們下載訂單後發送郵件與短信給使用者(簡單舉例通常不會哈)。那麼咱們在寫程序通常怎麼處理呢?(1)把下單信息存入數據庫中,調用郵件,短信接口,發送(並行發送或者一個一個發送),返回界面。可是咱們計算一下若是每一個操做時間爲30ms那麼最少也須要60ms,多的狀況是90ms,

那麼若是咱們加入消息隊列將是一個怎樣的狀況呢,咱們先把下單信息存入數據庫,同時把信息放到消息隊列。而後就不用管它了。這樣的話所用時間就是30ms+1ms(存消息隊列)。其實放消息隊列中仍是要管的的,但那是消費者的事和下單這個生產這無關。

應用解耦

  仍是商城下載訂單的問題,當咱們商城下載訂單,而後公司內部erp中庫存管理相應庫存進行同步。通常咱們怎麼處理,下載完訂單,調用erp系統,而後處理erp數據,接着把erp數據庫中的信息進行同步到商城,這個時候處理上面提到的效率,還有一個問題,須要解決:若是兩個系統不能同時訪問,你會怎麼作。那麼咱們就要對兩個系統進行解耦了對不對。這個時候消息隊列就有了用武之地。以下圖:其實消息隊列在這個功能下,咱們的erp系統也有寫入的時候,在這再也不累述業務,你們瞭解消息隊列的用途便可。

流量削鋒

作過商城的應該都會遇到這個問題,當舉行活動是擁擠大量的用戶,可能會是系統崩潰,這時候流量控制,和異常處理是一件特別重要的工做。固然請不要說在這其餘方法,咱們不對其進行討論,咱們盡對消息隊列的使用作簡單介紹。

消息通信

消息通信是指,消息隊列通常都內置了高效的通訊機制,所以也能夠用在純的消息通信。好比實現點對點消息隊列,或者聊天室等

以上咱們大體說了一下他的使用場景,那麼不知道你們有沒有了解到它究竟是個什麼東西?

其實吧消息隊列就是一個生產者,把相應消息(對象)放到消息隊列(中間件中),而後它就什麼都不用管了,接下來消費者(或者叫訂閱者)去消息隊列中間件中去獲取訂閱的信息,它本身再去處理。能解決的問題我們從上面的場景應該已經瞭解到了,解耦,提升效率。那麼重點來了消息隊列中間件又是什麼呢?它都有哪些,又是怎麼實現的呢?下面咱們就來了解其中的一箇中間件RabbitMq。

RabbitMq

  你們大體知道什麼是消息隊列了,那麼它的實現是什麼樣的呢?如今基本上也知道它實現重要的一環是消息對立中間件,rabbitmq,就是其中之一,其中還包括:Active MQ,Rocket Mq,Kafka,Zero MQ甚至也有人用redis來實現。

從個人角度來講我去了解了兩個AcctionMQ與RabbitMq這兩種最終選擇了它,也簡單作了相應的封裝,來我先來介紹一下RabbitMq.

  RabbitMQ是一個消息代理 - 一個消息系統的媒介。它能夠爲你的應用提供一個通用的消息發送和接收平臺,而且保證消息在傳輸過程當中的安全。它提供的內部機制包括持久性機制、投遞確認、發佈者證明和高可用性機制,多協議,集羣,聯合咱們能夠在實現的過程當中針對於性能與可靠性進行相應權衡。

  看一下:rabbitmq可視化工具以下(此可視化web的操做請你們自行查詢):

其實消息隊列的協議是AMQP,有不少對此的介紹在這再也不累述。結果上面的瞭解咱們大體知道它是個什麼東西,不過咱們也要在此提一下,幾個概念。消息、隊列、路由(包括點對點和發佈/訂閱),生產者,消費者,具體解釋我以爲不須要了,就是你理解的字面意思。

  其中隊列咱們通常用P來表示,消費者通常用C,隊列(存消息的集合)用q。路由是R.多個消費者能夠訪問多個q。接下來開始咱們的實現了。

RabbitMq的代碼實現

RabbitMq鏈接

首先看一下配置文件信息:

  <appSettings>
         <!--rabbitMQ-->
    <add key="serveraddress" value="amqp://192.168.0.76:5672/"/>
    <add key="virtualhost" value="erpadminvirtualhost"/>
    <add key="username" value="tx_junpin"/>
    <add key="password" value="abc.1234%"/>

  以上分別是訪問服務地址,虛擬地址(可在可視化上手動添加,記得要加一條數據進去,而後刪除,比如初始haunted同樣),用戶,密碼。其中web訪問地址通常爲端口後改成「15672」.

鏈接關鍵數據準備好以後就是c# 中代碼的實現了

 private RabbitConsumerConfig RBGetinfo;
        private ConnectionFactory cf = new ConnectionFactory();
        private IConnection conn; //創建聯接

        /// <summary>
        /// 初始化Rabbit鏈接
        /// </summary>
        /// <param name="rbinfo"></param>
        public RabbitConsumer(RabbitConsumerConfig rbinfo)
        {
            RBGetinfo = rbinfo;
            cf = new ConnectionFactory()
            {
                UserName = RBGetinfo.UserName,
                Password = RBGetinfo.Password,
                VirtualHost = RBGetinfo.VirtualHost,
                RequestedHeartbeat = 0,
                Uri = RBGetinfo.ServerAddress
            };
            conn = cf.CreateConnection();

        }

以上ConnectionFactory 內部爲中間件提供的鏈接工廠。方便與AMQP代理相關聯的Connection。用興趣的小夥伴請F12去看代碼吧。

調用代碼封裝

   /// <summary>
        /// 隊列出列的方法,傳入處理隊列中body的方法,並傳入隊列名稱
        /// </summary>
        /// <param name="messageProcessAction">要執行的方法(委託)</param>
        /// <param name="queuename">隊列名稱</param>
     /// <param name="count">獲取數據條數</param>
        public void ConsumeMessage(Action<string> messageProcessAction, string queuename, ushort count)
        {
            if (string.IsNullOrEmpty(queuename))
            {
                throw new ArgumentNullException("queuename");
            }
            CheckConn();
            using (IModel ch = conn.CreateModel())
            {
                //第二種取法QueueingBasicConsumer基於訂閱模式
                QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);
                ch.BasicQos(0, count, true);
                ch.BasicConsume(queuename, false, consumer);
                while (true)
                {
                    string message = "";
                    try
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        IBasicProperties props = e.BasicProperties;
                        byte[] body = e.Body;
                        message = System.Text.Encoding.UTF8.GetString(body);
                        messageProcessAction.Invoke(System.Text.Encoding.UTF8.GetString(body).Replace("\0\0\0body\0\n", "").Replace("\0", "").ToString());
                        ch.BasicAck(e.DeliveryTag, false);
                    }
                    catch (Exception ex)
                    {
                        throw new RabbitException() { InternalException = ex, QueueName = queuename, RabbitInfo = RBGetinfo.ToString(), CurrentMessage = message };
                    }
                }
            }
        }
View Code

其中 CheckConn()判斷是否鏈接若是沒鏈接繼續鏈接誒:

      private void CheckConn()
        {
            if (RBGetinfo != null && !IsOpen)
            {
                cf = new ConnectionFactory()
                {
                    UserName = RBGetinfo.UserName,
                    Password = RBGetinfo.Password,
                    VirtualHost = RBGetinfo.VirtualHost,
                    RequestedHeartbeat = 0,
                    Uri = RBGetinfo.ServerAddress
                };
                conn = cf.CreateConnection();
            }
        }
View Code

可能你們看到註釋了,是的,RabbitMQ Consumer 獲取消息有兩種方式(poll、subscribe) 。-----訂閱與輪詢。咱們用的是訂閱模式。寫到者忽然件想有時間仍是要把上面提到的那個幾個概念再梳理一下吧。

其中委託調用的方法:

    public void StockTBCExecute(string body)
        {
            logger.Error("StockTBCExecute" + body);
        }

你有可能會問。我可不能夠定義委託方法爲多個參數?我只能說,你看一下代碼:

 byte[] body = e.Body;
  message = System.Text.Encoding.UTF8.GetString(body);
   messageProcessAction.Invoke(System.Text.Encoding.UTF8.GetString(body).Replace("\0\0\0body\0\n", "").Replace("\0", "").ToString());

至於可否擴展大家本身去研究吧。

向中間件插入數據

  public class RabbitMQManager
    {
        private static readonly string _serverAddress;
        private static readonly string _virtualHost;
        private static readonly string _userName;
        private static readonly string _password;
        private static readonly ILog _logger = LogManager.GetLogger(typeof(RabbitMQManager));
        private static RabbitProducer _rabbitProducer;

        static RabbitMQManager()
        {
            _serverAddress = ConfigurationManager.AppSettings["serveraddress"];
            _virtualHost = ConfigurationManager.AppSettings["virtualhost"];
            _userName = ConfigurationManager.AppSettings["username"];
            _password = ConfigurationManager.AppSettings["password"];

        }

        /// <summary>
        /// 交換連接信息
        /// </summary>
        /// <param name="routingKey">路由關鍵字</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="message">消息內容</param>
        public static void SendRabbitMQ(string routingKey, string queueName, string message)
        {
            RabbitProducerConfig _rabbitConfig = new RabbitProducerConfig()
            {
                ServerAddress = _serverAddress,
                VirtualHost = _virtualHost,
                UserName = _userName,
                Password = _password,
                Exchange = "erp.service",
                ExchangeType = "direct",
                RoutingKey = routingKey
            };
            if (_rabbitProducer == null || !_rabbitProducer.IsOpen)
            {
                _rabbitProducer = new RabbitProducer(_rabbitConfig);
            }
            try
            {
                _rabbitProducer.ProduceMessage(message, queueName);
            }
            catch (Exception ex)
            {
                _logger.Error(ex);
            }
            finally
            {
                _rabbitProducer.Close();
            }
        }

      
    }
View Code

以上代碼好像也沒有什麼好解釋的,這裏面用到的路由與於隊列參數,基本上我使用的一個隊列會對應一個路由,可是 rabbitmq並不是只有這種方式。

那麼就在這多說一點吧。

RabbitMQ三種路由方式

Direct Exchange(直接路由)

任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue.(我封裝的方法是這種)

1 .通常狀況可使用rabbitMQ自帶的Exchange:"(該Exchange的名字爲空字符串,下文稱其爲 default Exchange)。
2 .這種模式下不須要將Exchange進行任何綁定(binding)操做
3 .消息傳遞時須要一個「RouteKey」,能夠簡單的理解爲要發送到的隊列名字。
4 .若是vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。

Fanout Exchange(廣播路由)

任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上。

1 .能夠理解爲路由表的模式
2 .這種模式不須要RouteKey
3 .這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。
4 .若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄

Topic Exchange(主題訂閱模式路由)

任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue上

1 .這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」(RouteKey),Exchange會將消息轉發到全部關注主題能與RouteKey模糊匹配的隊列。
2 .這種模式須要RouteKey,也許要提早綁定Exchange與Queue。
3 .在進行綁定時,要提供一個該隊列關心的主題,如「#.log.#」表示該隊列關心全部涉及log的消息(一個RouteKey爲」MQ.log.error」的消息會被轉發到該隊列)。
4 .「#」表示 0 個或若干個關鍵字,「*」表示一個關鍵字。如「log.*」能與「log.warn」匹配,沒法與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配。
5 .一樣,若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息。

最後的最後源碼

源碼:https://github.com/kmonkey9006/RabbitMQ

無論你是否瞭解上面我說的,你能夠直接用下面的方法來使用我封裝的這個類庫:

插入指定隊列一條數據:

   RabbitMQManager.SendRabbitMQ(RoutKey.RoutKey_stock_eshop, Queuen.Queuen_Stock_Eshop, "0108ZLY036");

獲取隊列中的數據(先進先出):

 public void Execute1()
        {
            while (true)
            {
                try
                {
                    if (rc1 == null || !rc1.IsOpen)
                    {
                        rc1 = new RabbitConsumer(rcc);
                    }
                    rc1.ConsumeMessage(StockEshopExecute, RabbitMQ.RabbitMqConst.Queuen.Queuen_Stock_Eshop, 1);
                }
                catch (Exception ex)
                {
                    logger.ErrorFormat("Execute1,異常:{0}", ex.Message);
                }
            }
        }

總結:

大體講的是怎麼在項目中使用,其中有不少細節與須要注意的東西詳細闡述,你們可本身詳細的去了解,針對於代碼,我已經傳到github上去,若是有什麼問題你們能夠給我提出來,有什麼須要討論的,請在公告來中找到QQ與我聯繫。

相關文章
相關標籤/搜索