NET下RabbitMQ實踐[實戰篇]

  以前的文章中,介紹瞭如何將RabbitMQ以WCF方式進行發佈。今天就介紹一下咱們產品中如何使用RabbitMQ的!
     
     在Discuz!NT企業版中,提供了對HTTP錯誤日誌的記錄功能,這一點對企業版很是重要,另外存儲錯誤日誌使用了MongoDB,理由很簡單,MongoDB的添加操做飛快,即便數量過億以後插入速度依舊不減。
     
     在開始正文以前,先說明一下本文的代碼分析順序,即:程序入口==》RabbitMQ客戶端===>RabbitMQ服務端。好了,閒話少說,開始正文!     
     
     首先是程序入口,也就是WCF+RabbitMQ客戶端實現:     
     
     由於Discuz!NT使用了HttpModule方式來接管HTTP連接請求,而在.NET的HttpModule模板中,能夠經過以下方法來接管程序運行時發生的ERROR,以下:     
     html

context.Error += new EventHandler(Application_OnError);

     
     而「記錄錯誤日誌"的功能入口就在這裏:     
    java

複製代碼
public void Application_OnError(Object sender, EventArgs e)
    {
        string requestUrl = DNTRequest.GetUrl();
        HttpApplication application = (HttpApplication)sender;
        HttpContext context = application.Context;

#if EntLib
        if (RabbitMQConfigs.GetConfig() != null && RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)//當開啓errlog錯誤日誌記錄功能時
        {
            RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(new HttpModuleErrLogData(LogLevel.High, context.Server.GetLastError().ToString()));//異步方式
            //RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(new HttpModuleErrLogData(LogLevel.High, "wrong message infomation!"));//同步方式
            return;
        }
#endif
        ...
     }
複製代碼

 

     
     固然從代碼能夠看出,記錄日誌的工做基本是經過配置文件控制的,即「HttpModuleErrLog.Enable」。
     
     而RabbitMQClientHelper是一個封裝類,主要用於反射生成IHttpModuleErrlogClient接口實例,該實例就是「基於WCF發佈的RabbitMQ」的客戶端訪問對象。
     mongodb

複製代碼
/// <summary>
/// RabbitMQ
/// </summary>
public class RabbitMQClientHelper
{
    static IHttpModuleErrlogClient ihttpModuleErrLogClient;

    private static object lockHelper = new object();

    public static IHttpModuleErrlogClient GetHttpModuleErrLogClient()
    {
        if (ihttpModuleErrLogClient == null)
        {
            lock (lockHelper)
            {
                if (ihttpModuleErrLogClient == null)
                {
                    try
                    {
                        if (RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)
                        {
                            ihttpModuleErrLogClient = (IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(
                                  "Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient, Discuz.EntLib.RabbitMQ.Client", false, true));
                        }
                    }
                    catch
                    {
                        throw new Exception("請檢查 Discuz.EntLib.RabbitMQ.dll 文件是否被放置到了bin目錄下!");
                    }
                }
            }
        }
        return ihttpModuleErrLogClient;
    }
}
複製代碼

     
    能夠看出它反射的是Discuz.EntLib.RabbitMQ.dll文件的HttpModuleErrLogClient對象(注:使用反射的緣由主要是解決企業版代碼與廣泛版代碼在項目引用上的相互依賴),下面就是其接口和具體要求實現:        
   數據庫

複製代碼
    /// <summary>
    /// IHttpModuleErrlogClient客戶端接口類,用於反射實例化綁定
    /// </summary>
    public interface IHttpModuleErrlogClient
    {
        void AddLog(HttpModuleErrLogData httpModuleErrLogData);

        void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData);
    }
    
    public class HttpModuleErrLogClient : IHttpModuleErrlogClient
    {
        public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
        {
            try
            {
                //((RabbitMQBinding)binding).OneWayOnly = true;
                ChannelFactory<IHttpModuleErrLogService> m_factory = new ChannelFactory<IHttpModuleErrLogService>(GetBinding(), "soap.amqp:///HttpModuleErrLogService");
                m_factory.Open();
                IHttpModuleErrLogService m_client = m_factory.CreateChannel();
                m_client.AddLog(httpModuleErrLogData);
                ((IClientChannel)m_client).Close();
                m_factory.Close();
            }
            catch (System.Exception e)
            {
                string msg = e.Message;
            }
        }

        private delegate void delegateAddLog(HttpModuleErrLogData httpModuleErrLogData);

        public void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData)
        {
            delegateAddLog AddLog_aysncallback = new delegateAddLog(AddLog);
            AddLog_aysncallback.BeginInvoke(httpModuleErrLogData, null, null);
        }

        public Binding GetBinding()
        {
            return new RabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);
        }
    }
複製代碼

    
    能夠看出,AddLog方法與上一篇中的客戶端內容基本上沒什麼太大差異,只不過它提供了同步和異步訪問兩種方式,這樣作的目的主要是用戶可根據生產環境來靈活配置。 
    
    下面就來看一下RabbitMQ的服務端實現,首先看一下其運行效果,以下圖:
    
    
    
    接着看一下啓動rabbitmq服務的代碼:    
    app

複製代碼
public void StartService(System.ServiceModel.Channels.Binding binding)
    {
        m_host = new ServiceHost(typeof(HttpModuleErrLogService), new Uri("soap.amqp:///"));
        //((RabbitMQBinding)binding).OneWayOnly = true;
        m_host.AddServiceEndpoint(typeof(IHttpModuleErrLogService), binding, "HttpModuleErrLogService");
        m_host.Open();
        m_serviceStarted = true;            
    }    
複製代碼

 

    
    上面代碼會添加IHttpModuleErrLogService接口實現類HttpModuleErrLogService 的Endpoint,並啓動它,下面就是該接口聲明:
    異步

複製代碼
    /// <summary>
    /// IHttpModuleErrLogService接口類
    /// </summary>  
    [ServiceContract]
    public interface IHttpModuleErrLogService
    {
        /// <summary>
        /// 添加httpModuleErrLogData日誌信息
        /// </summary>
        /// <param name="httpModuleErrLogData"></param>
        [OperationContract]
        void AddLog(HttpModuleErrLogData httpModuleErrLogData);
    }
複製代碼

 

    
    
    代碼很簡單,就是定義了一個添加日誌的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)
        
    下面就是接口的具體實現,首先是類聲明及初始化代碼:    
    ui

複製代碼
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] //Single - 爲全部客戶端調用分配一個服務實例。
    public class HttpModuleErrLogService : IHttpModuleErrLogService
    {    
        /// <summary>
        /// 獲取HttpModuleErrLogInfo配置文件對象實例
        /// </summary>
        private static HttpModuleErrLogInfo httpModuleErrorLogInfo = RabbitMQConfigs.GetConfig().HttpModuleErrLog;
        /// <summary>
        /// 定時器對象
        /// </summary>
        private static System.Timers.Timer _timer;
        /// <summary>
        /// 定時器的時間
        /// </summary>
        private static int _elapsed = 0;

        public static void Initial(System.Windows.Forms.RichTextBox msgBox, int elapsed)
        {
            _msgBox = msgBox;
            _elapsed = elapsed;

            //初始定時器
            if (_elapsed > 0)
            {
                _timer = new System.Timers.Timer() { Interval = elapsed * 1000,  Enabled = true, AutoReset = true };            
                _timer.Elapsed += new System.Timers.ElapsedEventHandler(Timer_Elapsed);
                _timer.Start();
            }
        }

        /// <summary>
        /// 時間到時執行出隊操做
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {    
            Dequeue();    
        }
複製代碼

 

        
        能夠看出,這裏使用了靜態定時器對象,來進行定時訪問隊列信息功能(「非同步出隊」操做),這樣設計的緣由主要是爲用戶提供適合的配置方式,即若是不使用定時器(爲0時),則系統會在日誌入隊後,就當即啓動出隊(「同步出隊」)操做獲取日誌信息並插入到MongoDB數據庫中。
      下面介紹一下入隊操做實現:       
       spa

複製代碼
        /// <summary>
        /// 添加httpModuleErrLogData日誌信息
        /// </summary>
        /// <param name="httpModuleErrLogData"></param>
        public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
        {
            Enqueue(httpModuleErrLogData);

            if (_elapsed <=0) //若是使用定時器(爲0時),則當即執行出隊操做
                Dequeue();
        }   

        /// <summary>
        /// 交換機名稱
        /// </summary>
        private const string EXCHANGE = "ex1";
        /// <summary>
        /// 交換方法,更多內容參見:http://melin.javaeye.com/blog/691265
        /// </summary>
        private const string EXCHANGE_TYPE = "direct";
        /// <summary>
        /// 路由key,更多內容參見:http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/
        /// </summary>
        private const string ROUTING_KEY = "m1";

        /// <summary>
        /// 日誌入隊
        /// </summary>
        /// <param name="httpModuleErrLogData"></param>
        public static void Enqueue(HttpModuleErrLogData httpModuleErrLogData)
        {
            Uri uri = new Uri(httpModuleErrorLogInfo.RabbitMQAddress);         
            ConnectionFactory cf = new ConnectionFactory()
            {
                UserName = httpModuleErrorLogInfo.UserName,
                Password = httpModuleErrorLogInfo.PassWord,
                VirtualHost = "dnt_mq",
                RequestedHeartbeat = 0,
                Endpoint = new AmqpTcpEndpoint(uri)
            };
            using (IConnection conn = cf.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
                {
                    if (EXCHANGE_TYPE != null)
                    {
                        ch.ExchangeDeclare(EXCHANGE, EXCHANGE_TYPE);//,true,true,false,false, true,null);
                        ch.QueueDeclare(httpModuleErrorLogInfo.QueueName, true);//true, true, true, false, false, null);
                        ch.QueueBind(httpModuleErrorLogInfo.QueueName, EXCHANGE, ROUTING_KEY, false, null);
                    }
                    IMapMessageBuilder b = new MapMessageBuilder(ch);
                    IDictionary target = b.Headers;
                    target["header"] = "HttpErrLog";
                    IDictionary targetBody = b.Body;
                    targetBody["body"] = SerializationHelper.Serialize(httpModuleErrLogData);
                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;//persistMode                   
                    ch.BasicPublish(EXCHANGE, ROUTING_KEY,
                                               (IBasicProperties)b.GetContentHeader(),
                                               b.GetContentBody());
                }
            }
        }
複製代碼

        
        代碼很簡單,主要構造rabbitmq連接(ConnectionFactory)並初始化相應參數如用戶名,密碼,ROUTING_KEY等。
        
        而後將傳入的日誌對象序列化成字符串對象,賦值給targetBody["body"],這樣作主要是由於我沒找到更好的方法來賦值(以前嘗試直接綁定httpModuleErrLogData到targetBody["body"],但在出隊操做中找不到合適方法將httpModuleErrLogData對象解析出來)。
        
        下面就是出隊操做:       
                設計

複製代碼
        /// <summary>
        /// 日誌出隊
        /// </summary>
        public static void Dequeue()
        {       
            string serverAddress = httpModuleErrorLogInfo.RabbitMQAddress.Replace("amqp://", "").TrimEnd('/'); //"10.0.4.85:5672";
            ConnectionFactory cf = new ConnectionFactory()
            {
                UserName = httpModuleErrorLogInfo.UserName,
                Password = httpModuleErrorLogInfo.PassWord,
                VirtualHost = "dnt_mq",
                RequestedHeartbeat = 0,
                Address = serverAddress
            };
      
            using (IConnection conn = cf.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
                {
                    while (true)
                    {
                        BasicGetResult res = ch.BasicGet(httpModuleErrorLogInfo.QueueName, false);
                        if (res != null)
                        {
                            try
                            {
                                string objstr = System.Text.UTF8Encoding.UTF8.GetString(res.Body).Replace("\0\0\0body\0\n", "");//去掉頭部信息
                                object obj = SerializationHelper.DeSerialize(typeof(HttpModuleErrLogData), objstr);
                                HttpModuleErrLogData httpModuleErrLogData = obj as HttpModuleErrLogData;
                                if (httpModuleErrLogData != null)
                                {
                                    MongoDbHelper.Insert(new Mongo(httpModuleErrorLogInfo.MongoDB), "dnt_httpmoduleerrlog", LoadAttachment(httpModuleErrLogData));
                                    _msgBox.BeginInvoke(new ShowMsg(SetMsgRichBox), "\r發生時間:" + httpModuleErrLogData.TimeStamp + "\r錯誤等級:" + httpModuleErrLogData.Level + "\r詳細信息:" + httpModuleErrLogData.Message);
                                    ch.BasicAck(res.DeliveryTag, false);
                                }
                            }
                            catch { }
                        }
                        else
                            break;
                    }
                }
            }           
        } 
複製代碼

 

        
        出隊操做也是先實例化連接到rabbitmq 的實例,並循環使用BasicGet方法來單條獲取隊列信息,並最終將res.Body的數據序列化成HttpModuleErrLogData對象,並最終插入到mongodb數據庫中。同時將獲取的隊列消息顯示出來:
        3d

    _msgBox.BeginInvoke(new ShowMsg(SetMsgRichBox), "\r發生時間:" + httpModuleErrLogData.TimeStamp + "\r錯誤等級:" + httpModuleErrLogData.Level + "\r詳細信息:" + httpModuleErrLogData.Message);

         
        這裏使用異步方式顯示出隊的日誌信息,其聲明的delegate 方法「ShowMsg」以下: 
       

複製代碼
        /// <summary>
        /// 聲明委託
        /// </summary>
        /// <param name="message"></param>
        public delegate void ShowMsg(string message);
        /// <summary>
        /// 綁定到上面delegate的方法
        /// </summary>
        /// <param name="outPut"></param>
        public static void SetMsgRichBox(string outPut)
        {
            _msgBox.Text += "\r==================================\r下列錯誤信息出隊時間=>" + DateTime.Now + outPut + "\r";
        }
複製代碼

 

        
        同時使用LoadAttachment方法來實現HttpModuleErrLogData到mongodb的Document類型的轉換:        
       

複製代碼
        /// <summary>
        /// 將HttpModuleErrLogData轉換成Document類型
        /// </summary>
        /// <param name="httpModuleErrLogData"></param>
        /// <returns></returns>
        public static Document LoadAttachment(HttpModuleErrLogData httpModuleErrLogData)
        {
           Document doc = new Document();
            doc["_id"] = httpModuleErrLogData.Oid;
            doc["level"] = httpModuleErrLogData.Level;
            doc["message"] = httpModuleErrLogData.Message;
            doc["timestamp"] = httpModuleErrLogData.TimeStamp;
            return doc;
        }       
複製代碼

              到這裏,主要的功能介紹就差很少了。固然本文所闡述的只是一個原型,相信會隨着對rabbitmq的理解深刻而不斷完善,感興趣的朋友歡迎討論交流,以糾正我認識上的誤差,呵呵。

相關文章
相關標籤/搜索