RabbitMQ實戰經驗分享

前言

最近在忙一個高考項目,看着系統順利完成了此次高考,終於能夠鬆口氣了。看到那些即將參加高考的學生,也想起當年高三的本身。html

下面分享下RabbitMQ實戰經驗,但願對你們有所幫助:git


 

 

1、生產消息

關於RabbitMQ的基礎使用,這裏再也不介紹了,項目中使用的是Exchange中的topic模式。github

先上發消息的代碼性能

private bool MarkErrorSend(string[] lstMsg)
        {
            try
            {
                var factory = new ConnectionFactory()
                {
                    UserName = "guest",//用戶名
                    Password = "guest",//密碼
                    HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"],
                };
                //建立鏈接
                var connection = factory.CreateConnection();
                //建立通道
                var channel = connection.CreateModel();
                try
                {
                    //定義一個Direct類型交換機
                    channel.ExchangeDeclare(
                        exchange: "TestTopicChange", //exchange名稱
                        type: ExchangeType.Topic, //Topic模式,採用路由匹配
                        durable: true,//exchange持久化
                        autoDelete: false,//是否自動刪除,通常設成false
                        arguments: null//一些結構化參數,好比:alternate-exchange
                        );

                    //定義測試隊列
                    channel.QueueDeclare(
                        queue: "Test_Queue", //隊列名稱
                        durable: true, //隊列磁盤持久化(要和消息持久化一塊兒使用纔有效)
                        exclusive: false,//是否排他的,false。若是一個隊列聲明爲排他隊列,該隊列首次聲明它的鏈接可見,並在鏈接斷開時自動刪除
                        autoDelete: false,//是否自動刪除,通常設成false
                        arguments: null
                        );

                    //將隊列綁定到交換機
                    string routeKey = "TestRouteKey.*";//*匹配一個單詞
                    channel.QueueBind(
                        queue: "Test_Queue",
                        exchange: "TestTopicChange",
                        routingKey: routeKey,
                        arguments: null
                        );

                    //消息磁盤持久化,把DeliveryMode設成2(要和隊列持久化一塊兒使用纔有效)
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;

                    channel.ConfirmSelect();//發送確認機制
                    foreach (var itemMsg in lstMsg)
                    {
                        byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);
                        //發佈消息
                        channel.BasicPublish(
                            exchange: "TestTopicChange",
                            routingKey: "TestRouteKey.one",
                            basicProperties: properties,
                            body: sendBytes
                            );
                    }
                    bool isAllPublished = channel.WaitForConfirms();//通道(channel)裏全部消息均發送才返回true
                    return isAllPublished;
                }
                catch (Exception ex)
                {
                    //寫錯誤日誌
                    return false;
                }
                finally
                {
                    channel.Close();
                    connection.Close();
                }
            }
            catch
            {
                //RabbitMQ.Client.Exceptions.BrokerUnreachableException:
                //When the configured hostname was not reachable.
                return false;
            }
        }

        發消息沒啥特別的。關於消息持久化的介紹這裏也再也不介紹,不懂的能夠看上篇文章。發消息須要注意的地方是,能夠選擇多條消息一塊兒發送,最後才肯定消息發送成功,這樣效率比較高;此外,須要儘可能精簡每條消息的長度(樓主在這裏吃過虧),否則會因消息過長從而增長髮送時間。在實際項目中一次發了4萬多條數據沒有出現問題。測試

 


 

2、接收消息

       接下來講下消費消息的過程,我使用的是單個鏈接多個channel,每一個channel每次只取一條消息方法。有人會問單個TCP鏈接,多個channel會不會影響通訊效率。這個理論上確定會有影響的,看影響大不大而已。我開的channel數通常去到30左右,並無以爲影響效率,有多是由於我每一個channel是拿一條消息的緣由。經過單個鏈接多個channel的方法,能夠少開了不少鏈接。至於我爲何選每一個channel每次只取一條消息,這是外界因素限制了,具體看本身需求。spa

       接下接收消息的過程,首先定義一個RabbitMQHelper類,裏面有個全局的conn鏈接變量,此外還有建立鏈接、關閉鏈接和驗證鏈接是否打開等方法。程序運行一個定時器,當線程

檢測到鏈接未打開的狀況下,主動建立鏈接處理消息。日誌

 public class RabbitMQHelper
    {
        public IConnection conn = null;

        /// <summary>
        /// 建立RabbitMQ消息中間件鏈接
        /// </summary>
        /// <returns>返回鏈接對象</returns>
        public IConnection RabbitConnection(string sHostName, ushort nChannelMax)
        {
            try
            {
                if (conn == null)
                {
                    var factory = new ConnectionFactory()
                    {
                        UserName = "guest",//用戶名
                        Password = "guest",//密碼
                        HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"],
                        AutomaticRecoveryEnabled = false,//取消自動重連,改用定時器定時檢測鏈接是否存在
                        RequestedConnectionTimeout = 10000,//請求超時時間設成10秒,默認的爲30秒
                        RequestedChannelMax = nChannelMax//與開的線程數保持一致
                    };
                    //建立鏈接
                    conn = factory.CreateConnection();
                    Console.WriteLine("RabbitMQ鏈接已建立!");
                }

                return conn;
            }
            catch
            {
                Console.WriteLine("建立鏈接失敗,請檢查RabbitMQ是否正常運行!");
                return null;
            }
        }

        /// <summary>
        /// 關閉RabbitMQ鏈接
        /// </summary>
        public void Close()
        {
            try
            {
                if (conn != null)
                {
                    if (conn.IsOpen)
                        conn.Close();
                    conn = null;
                    Console.WriteLine("RabbitMQ鏈接已關閉!");
                }
            }
            catch { }
        }

        /// <summary>
        /// 判斷RabbitMQ鏈接是否打開
        /// </summary>
        /// <returns></returns>
        public bool IsOpen()
        {
            try
            {
                if (conn != null)
                {
                    if (conn.IsOpen)
                        return true;
                }
                return false;
            }
            catch
            {
                return false;
            }
        }
    }

 

       接下來咱們看具體如何接收消息。code

private static AutoResetEvent myEvent = new AutoResetEvent(false);
private RabbitMQHelper rabbit = new RabbitMQHelper();
private ushort nChannel = 10;//一個鏈接的最大通道數和所開的線程數一致

       首先初始化一個rabbit實例,而後經過RabbitConnection方法建立RabbitMQ鏈接。htm

       當鏈接打開時候,用線程池運行接收消息的方法。注意了,這裏開的線程必須和開的channel數量一致,否則會有問題(具體問題是,設了RabbitMQ鏈接超時時間爲10秒,有時候無論用,緣由未查明。RabbitMQ建立鏈接默認超時時間爲30秒,假如在這個時間內再去調用建立的話,就有可能獲得兩倍的channel;)

/// <summary>
        /// 單個RabbitMQ鏈接開多個線程,每一個線程開一個channel接受消息
        /// </summary>
        private void CreateConnecttion()
        {
            try
            {
                rabbit.RabbitConnection("localhost", nChannel);
                if (rabbit.conn != null)
                {
                    ThreadPool.SetMinThreads(1, 1);
                    ThreadPool.SetMaxThreads(100, 100);
                    for (int i = 1; i <= nChannel; i++)
                    {
                        ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), "");
                    }
                    myEvent.WaitOne();//等待全部線程工做完成後,才能關閉鏈接
                    rabbit.Close();
                }
            }
            catch (Exception ex)
            {
                rabbit.Close();
                Console.WriteLine(ex.Message);
            }
        }

 

       接着就是接收消息的方法,處理消息的過程省略了。

  /// <summary>
        /// 接收並處理消息,在一個鏈接中建立多個通道(channel),避免建立多個鏈接
        /// </summary>
        /// <param name="con">RabbitMQ鏈接</param>
        private void ReceiveMsg(object obj)
        {
            IModel channel = null;
            try
            {
                #region 建立通道,定義中轉站和隊列
                channel = rabbit.conn.CreateModel();
                channel.ExchangeDeclare(
                    exchange: "TestTopicChange", //exchange名稱
                    type: ExchangeType.Topic, //Topic模式,採用路由匹配
                    durable: true,//exchange持久化
                    autoDelete: false,//是否自動刪除,通常設成false
                    arguments: null//一些結構化參數,好比:alternate-exchange
                    );

                //定義閱卷隊列
                channel.QueueDeclare(
                    queue: "Test_Queue", //隊列名稱
                    durable: true, //隊列磁盤持久化(要和消息持久化一塊兒使用纔有效)
                    exclusive: false,//是否排他的,false。若是一個隊列聲明爲排他隊列,該隊列首次聲明它的鏈接可見,並在鏈接斷開時自動刪除
                    autoDelete: false,
                    arguments: null
                    );
                #endregion
                channel.BasicQos(0, 1, false);//每次只接收一條消息

                channel.QueueBind(queue: "Test_Queue",
                                      exchange: "TestTopicChange",
                                      routingKey: "TestRouteKey.*");
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    //處理消息方法
                    try
                    {
                        bool isMark = AutoMark(message);
                        if (isMark)
                        {
                            //Function.writeMarkLog(message);
                            //確認該消息已被消費,發消息給RabbitMQ隊列
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                        else
                        {
                            if (MarkErrorSend(message))//把錯誤消息推到錯誤消息隊列
                                channel.BasicReject(ea.DeliveryTag, false);
                            else
                                //消費消息失敗,拒絕此消息,重回隊列,讓它能夠繼續發送到其餘消費者 
                                channel.BasicReject(ea.DeliveryTag, true);
                        }
                    }
                    catch (Exception ex)
                    {
                        try
                        {
                            Console.WriteLine(ex.Message);
                            if (channel != null && channel.IsOpen)//處理RabbitMQ中止重啓而自動評閱崩潰的問題
                            {
                                //消費消息失敗,拒絕此消息,重回隊列,讓它能夠繼續發送到其餘消費者 
                                channel.BasicReject(ea.DeliveryTag, true);
                            }
                        }
                        catch { }
                    }
                };
                //手動確認消息
                channel.BasicConsume(queue: "Test_Queue",
                                     autoAck: false,
                                     consumer: consumer);
            }
            catch (Exception ex)
            {
                try
                {
                    Console.WriteLine("接收消息方法出錯:" + ex.Message);
                    if (channel != null && channel.IsOpen)//關閉通道
                        channel.Close();
                    if (rabbit.conn != null)//處理RabbitMQ忽然中止的問題
                        rabbit.Close();
                }
                catch { }
            }
        }

 

 

3、處理錯誤消息

       把處理失敗的消息放到「錯誤隊列」,而後把原隊列的消息刪除(這裏主要解決問題是,存在多個處理失敗或處理不了的消息時,若是把這些消息都放回原隊列,它們會繼續分發到其餘線程的channel,但結果仍是處理不了,就會形成一個死循環,致使後面的消息沒法處理)。把第一次處理不了的消息放到「錯誤隊列」後,從新再開一個新的鏈接去處理「錯誤隊列」的消息。

/// <summary>
        /// 把處理錯誤的消息發送到「錯誤消息隊列」
        /// </summary>
        /// <param name="msg"></param>
        /// <returns></returns>
        private bool MarkErrorSend(string msg)
        {
            RabbitMQHelper MQ = new RabbitMQHelper();
            MQ.RabbitConnection("localhost",1);
            //建立通道
            var channel = MQ.conn.CreateModel();
            try
            {
                //定義一個Direct類型交換機
                channel.ExchangeDeclare(
                    exchange: "ErrorTopicChange", //exchange名稱
                    type: ExchangeType.Topic, //Topic模式,採用路由匹配
                    durable: true,//exchange持久化
                    autoDelete: false,//是否自動刪除,通常設成false
                    arguments: null//一些結構化參數,好比:alternate-exchange
                    );

                //定義閱卷隊列
                channel.QueueDeclare(
                    queue: "Error_Queue", //隊列名稱
                    durable: true, //隊列磁盤持久化(要和消息持久化一塊兒使用纔有效)
                    exclusive: false,//是否排他的,false。若是一個隊列聲明爲排他隊列,該隊列首次聲明它的鏈接可見,並在鏈接斷開時自動刪除
                    autoDelete: false,//是否自動刪除,通常設成false
                    arguments: null
                    );

                //將隊列綁定到交換機
                string routeKey = "ErrorRouteKey.*";//*匹配一個單詞
                channel.QueueBind(
                    queue: "Error_Queue",
                    exchange: "ErrorTopicChange",
                    routingKey: routeKey,
                    arguments: null
                    );

                //消息磁盤持久化,把DeliveryMode設成2(要和隊列持久化一塊兒使用纔有效)
                IBasicProperties properties = channel.CreateBasicProperties();
                properties.DeliveryMode = 2;

                channel.ConfirmSelect();//發送確認機制
                byte[] sendBytes = Encoding.UTF8.GetBytes(msg);
                //發佈消息
                channel.BasicPublish(
                    exchange: "ErrorTopicChange",
                    routingKey: "ErrorRouteKey.one",
                    basicProperties: properties,
                    body: sendBytes
                    );

                bool isAllPublished = channel.WaitForConfirms();//通道(channel)裏全部消息均發送才返回true
                return isAllPublished;
            }
            catch (Exception ex)
            {
                //寫錯誤日誌
                return false;
            }
            finally
            {
                channel.Close();
                MQ.conn.Close();
            }
        }

 

總結:RabbitMQ自己已經很穩定了,並且性能也很好,全部不穩定的因素都在咱們處理消息的過程,因此能夠放心使用。

Demo源碼地址:https://github.com/Bingjian-Zhu/RabbitMQHelper

相關文章
相關標籤/搜索