C#實戰Microsoft Messaging Queue(MSMQ)消息隊列(乾貨)

前言

           在使用MSMQ以前,咱們須要自行安裝消息隊列組件!(具體安裝方法你們本身搜一下吧)html

           採用MSMQ帶來的好處是:因爲是異步通訊,不管是發送方仍是接收方都不用等待對方返回成功消息,就能夠執行餘下的代碼,於是大大地提升了事物處理的能力;當信息傳送過程當中,信息發送機制具備必定功能的故障恢復能力;MSMQ的消息傳遞機制使得消息通訊的雙方具備不一樣的物理平臺成爲可能。數據庫

       在微軟的.net平臺上利用其提供的MSMQ功能,能夠輕鬆建立或者刪除消息隊列、發送或者接收消息、甚至於對消息隊列進行管理
 
       MSMQ隊列是一個可持久的隊列,所以沒必要擔憂不間斷地插入隊列會致使數據的丟失,在網站系統中不用擔憂網站掛掉後數據丟失問題
 
       MSMQ缺點:不適合於Client須要Server端實時交互狀況.大量請求時候,響應延遲,另外這是微軟的東西,你懂得........
 
       下面主要講一講我在實戰MSMQ的一些心得和體會跟你們分享一下:主要是在個人完整的站內搜索Demo(Lucene.Net+盤古分詞)項目的基礎上進行改造,而後使用了MSMQ,也讓你們可以在真正的項目中感覺到MSMQ該如何使用和起到的做用!

MSMQ的基本使用

           參考了PetShop裏MSMQ的代碼,爲了考慮到在擴展中會有其餘的數據數據對象會使用到MSMQ,所以定義了一個DTcmsQueue的基類,實現消息Receive和Send的基本操做,使用到MSMQ的數據對象須要繼承DTcmsQueue基類,須要注意的是:在MSMQ中使用事務的話,須要建立事務性的專用消息隊列,代碼以下:數組

using System;
using System.Messaging;
using log4net;

namespace DTcms.Web.UI
{
    /// <summary>
    /// 該類實現從消息對列中發送和接收消息的主要功能 
    /// </summary>
    public class DTcmsQueue : IDisposable {

        private static ILog logger = LogManager.GetLogger(typeof(DTcmsQueue));
        //指定消息隊列事務的類型,Automatic 枚舉值容許發送外部事務和從處部事務接收
        protected MessageQueueTransactionType transactionType = MessageQueueTransactionType.Automatic;
        protected MessageQueue queue;
        protected TimeSpan timeout;
        //實現構造函數
        public DTcmsQueue(string queuePath, int timeoutSeconds) {
            Createqueue(queuePath);
            queue = new MessageQueue(queuePath);
            timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds));

            //設置當應用程序向消息對列發送消息時默認狀況下使用的消息屬性值
            queue.DefaultPropertiesToSend.AttachSenderId = false;
            queue.DefaultPropertiesToSend.UseAuthentication = false;
            queue.DefaultPropertiesToSend.UseEncryption = false;
            queue.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.None;
            queue.DefaultPropertiesToSend.UseJournalQueue = false;
        }

        /// <summary>
        /// 繼承類將從自身的Receive方法中調用如下方法,該方法用於實現消息接收
        /// </summary>
        public virtual object Receive() {
            try
            {
                using (Message message = queue.Receive(timeout, transactionType))
                    return message;
            }
            catch (MessageQueueException mqex)
            {
                if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                    throw new TimeoutException();

                throw;
            }
        }

        /// <summary>
        /// 繼承類將從自身的Send方法中調用如下方法,該方法用於實現消息發送
        /// </summary>
        public virtual void Send(object msg) {
            queue.Send(msg, transactionType);
        }

        /// <summary>
        /// 經過Create方法建立使用指定路徑的新消息隊列
        /// </summary>
        /// <param name="queuePath"></param>
        public static void Createqueue(string queuePath)
        {
            try
            {
                if (!MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Create(queuePath, true);  //建立事務性的專用消息隊列
                    logger.Debug("建立隊列成功!");
                }
            }
            catch (MessageQueueException e)
            {
                logger.Error(e.Message);
            }
        }

        #region 實現 IDisposable 接口成員
        public void Dispose() {
            queue.Dispose();
        }
        #endregion
    }
}

 MSMQ的具體實現方式

       上面咱們已經建立了DTcmsQueue基類,咱們具體實現的時候須要繼承此基類,使用消息隊列的時候,傳遞的是一個對象,因此咱們首先要建立這個對象,可是須要注意的一點:此對象是必須可序列化的,不然不能被插入到消息隊列裏,代碼以下:多線程

    /// <summary>
    /// 枚舉,操做類型是增長仍是刪除
    /// </summary>
    public enum JobType { Add, Remove }
    /// <summary>
    /// 任務類,包括任務的Id ,操做的類型
    /// </summary>
    [Serializable]
    public class IndexJob
    {
        public int Id { get; set; }
        public JobType JobType { get; set; }
    }

      在具體的實現類裏面,咱們只須要繼承此基類,而後重寫基類的方法,具體代碼以下:異步

using System;
using System.Configuration;
using System.Messaging;

namespace DTcms.Web.UI
{

    /// <summary>
    /// 該類實現從消息隊列中發送和接收訂單消息
    /// </summary>
    public class OrderJob : DTcmsQueue {

        // 獲取配置文件中有關消息隊列路徑的參數
        private static readonly string queuePath = ConfigurationManager.AppSettings["OrderQueuePath"];
        private static int queueTimeout = 20;
        //實現構造函數
        public OrderJob()
            : base(queuePath, queueTimeout)
        {
            // 設置消息的序列化採用二進制方式 
            queue.Formatter = new BinaryMessageFormatter();
        }

        /// <summary>
        /// 調用PetShopQueue基類方法,實現從消息隊列中接收訂單消息
        /// </summary>
        /// <returns>訂單對象 OrderInfo</returns>
        public new IndexJob Receive()
        {
            // 指定消息隊列事務的類型,Automatic枚舉值容許發送發部事務和從外部事務接收
            base.transactionType = MessageQueueTransactionType.Automatic;
            return (IndexJob)((Message)base.Receive()).Body;
        }
        //該方法實現從消息隊列中接收訂單消息
        public IndexJob Receive(int timeout)
        {
            base.timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeout));
            return Receive();
        }

        /// <summary>
        /// 調用PetShopQueue基類方法,實現從消息隊列中發送訂單消息
        /// </summary>
        /// <param name="orderMessage">訂單對象 OrderInfo</param>
        public void Send(IndexJob orderMessage)
        {
            // 指定消息隊列事務的類型,Single枚舉值用於單個內部事務的事務類型
            base.transactionType = MessageQueueTransactionType.Single;
            base.Send(orderMessage);
        }
    }
}

 項目中MSMQ的具體應用

     將任務添加到消息隊列代碼就很簡單了,沒啥好說的,直接上代碼:分佈式

        #region 任務添加
        public void AddArticle(int artId)
        {
            OrderJob orderJob = new OrderJob();
            IndexJob job = new IndexJob();
            job.Id = artId;
            job.JobType = JobType.Add;
            logger.Debug(artId + "加入任務列表");
            orderJob.Send(job);//把任務加入消息隊列
        }

        public void RemoveArticle(int artId)
        {
            OrderJob orderJob = new OrderJob();
            IndexJob job = new IndexJob();
            job.JobType = JobType.Remove;
            job.Id = artId;
            logger.Debug(artId + "加入刪除任務列表");
            orderJob.Send(job);//把任務加入消息隊列
        }
        #endregion

      接下來就是以下獲得消息隊列的任務,並將任務完成,由於消息隊列是系統的一個組件跟咱們的項目是徹底分開的,咱們能夠徹底獨立的完成接收消息隊列的任務並處理後來的動做,這樣就作到了異步處理,例如作一個Windows Service,更重要的是MSMQ仍是一種分佈式處理技術,在本項目中,咱們主要是開闢了多線程來接收消息隊列的任務並處理後來的動做,具體代碼以下:函數

        public void CustomerStart()
        {
            log4net.Config.XmlConfigurator.Configure();

            PanGu.Segment.Init(PanGuPath);

            //聲明線程
            Thread workTicketThread;
            Thread[] workerThreads = new Thread[threadCount];

            for (int i = 0; i < threadCount; i++)
            {
                //建立 Thread 實例 
                workTicketThread = new Thread(new ThreadStart(ProcessOrders));

                // 設置線程在後臺工做和線程啓動前的單元狀態(STA表示將建立並進入一個單線程單元 )
                workTicketThread.IsBackground = true;
                workTicketThread.SetApartmentState(ApartmentState.STA);

                //啓動線程,將調用ThreadStart委託
                workTicketThread.Start();
                workerThreads[i] = workTicketThread;
            }

            logger.Debug("進程已經開始啓動. 按回車鍵中止.");
        }
        private static void ProcessOrders()
        {

            // 總事務處理時間(tsTimeout )就該超過批處理任務消息的總時間 
            TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize));

            OrderJob orderJob = new OrderJob();
            while (true)
            {

                // 消息隊列花費時間
                TimeSpan datetimeStarting = new TimeSpan(DateTime.Now.Ticks);
                double elapsedTime = 0;

                int processedItems = 0;

                ArrayList queueOrders = new ArrayList();

                using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout))
                {
                    // 接收來自消息隊列的任務消息
                    for (int j = 0; j < batchSize; j++)
                    {

                        try
                        {
                            //若是有足夠的時間,那麼接收任務,並將任務存儲在數組中 
                            if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds)
                            {
                                queueOrders.Add(orderJob.Receive(queueTimeout));
                            }
                            else
                            {
                                j = batchSize;   // 結束循環
                            }

                            //更新已佔用時間
                            elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datetimeStarting.TotalSeconds;
                        }
                        catch (TimeoutException)
                        {

                            //結束循環由於沒有可等待的任務消息
                            j = batchSize;
                        }
                    }

                    //從數組中循環取出任務對象,並將任務插入到數據庫中
                    for (int k = 0; k < queueOrders.Count; k++)
                    {
                        SearchHelper sh = new SearchHelper();
                        sh.IndexOn((IndexJob)queueOrders[k]);
                        processedItems++;
                        totalOrdersProcessed++;
                    }

                    //指示範圍中的全部操做都已成功完成
                    ts.Complete();
                }
                //完成後顯示處理信息
                logger.Debug("(線程 Id " + Thread.CurrentThread.ManagedThreadId + ") 批處理完成, " + processedItems + " 任務, 處理花費時間: " + elapsedTime.ToString() + " 秒.");
            }
        }

 結束語

            以上就是我在實際項目中使用MSMQ的一些心得,但願對各位看官有所幫助,MSMQ具體實現代碼上面已經貼出來了,因爲本項目是《動力起航》的源代碼,一方面因爲文件過大,另外一方面不知道是否是會侵權,全部沒有提供下載.若是有須要的朋友能夠留下郵箱我將發給你,但僅供學習交流之用,誤用作商業用途,以上若是有侵權等問題還請及時告知我,以便我及時更正!post

           另外此項目源代碼已上傳至搜索技術交流羣:77570783,源代碼已上傳至羣共享,須要的朋友,請自行下載!學習

           若是以爲好的話請給個推薦哦~~~~親網站

相關文章
相關標籤/搜索