MSMQ消息隊列

消息隊列(MSMQ)技術使得運行於不一樣時間的應用程序可以在各類各樣的網絡和可能暫時脫機的系統之間進行通訊。html

應用程序將消息發送到隊列,並從隊列中讀取消息。數據庫

消息便是信息的載體。爲了讓消息發送者和消息接收者都可以明白消息所承載的信息(消息發送者須要知道如何構造消息;消息接收者須要知道如何解析消息)數組

下圖演示了消息隊列如何保存由多個發送應用程序生成的消息,並被多個接收應用程序讀取。緩存

消息隊列的安裝能夠服務器

一、安裝消息隊列
   開始—》控制面板—》管理工具—》服務器管理器—》功能—》添加功能—》依次展開MSM、MSMQ服務—》肯定。
二、管理消息隊列
   計算機—》右鍵—》管理—》功能—》消息隊列。網絡

參考 點擊異步

我理解的MSMQ

      MSMQ能夠被當作一個數據儲存裝置,就如同數據庫,只不過數據存儲的是一條一條的記錄,而MSMQ存儲的是一個一個的消息(messsge)。函數

Message能夠被理解爲一種數據容器,咱們在稍後會講到。MSMQ一個重要的應用場景就是離線信息交互,例如,咱們在給朋友發送郵件,而此時朋友並未登入郵箱,工具

這個時候咱們的郵件就能夠發到郵件服務器的MSMQ隊列中,當朋友登入郵箱的時候,系統在從服務器的MSMQ隊列中取出U件。固然MSMQ的用途遠不止這些,學習

例如,充當數據緩存,實現異步操做等等,這裏就不在一一舉例了。

建立、刪除和管理隊列
   要開發MSMQ程序就必須學習一個很重要的類(MessageQueue),該類位於名稱空間System.Messageing下。
經常使用方法:
   --Create()            方法:建立使用指定路徑的新消息隊列。
   --Delete() 方法:        刪除現有的消息隊列。
   --Existe() 方法:       查看指定消息隊列是否存在。
   --GetAllMessages() 方法 :    獲得隊列中的全部消息。
   --GetPublicQueues() 方法:   在「消息隊列」網絡中定位消息隊列。
   --Peek()/BeginPeek() 方法:  查看某個特定隊列中的消息隊列,但不從該隊列中移出消息。
   --Receive()/BeginReceive()    方法:檢索指定消息隊列中最前面的消息並將其從該隊列中移除。
   --Send()           方法:發送消息到指定的消息隊列。
   --Purge()            方法:清空指定隊列的消息。
經常使用屬性:
   Priority 設置消息優先級,MessagePriority 枚舉裏所有進行了封裝,MessagePriority.High()
   AboveNormal:hight Normal 消息優先級之間;
   High:高級消息優先級;
   Highest:最高消息優先級;
   Low:低消息優先級;
   Lowest:最低消息優先級;
   Normal:普通消息優先級;
   VeryHigh:Highest High 消息優先級之間;
   VeryLow:Low Lowest 消息優先級之間;
發送和序列化消息
   MSMQ消息隊列中定義的消息由一個主體(body)和若干屬性構成。消息的主體能夠由文本、二進制構成,根據須要還能夠被加密。
   在MSMQ中消息的大小不可以超過4MB。發送消息是經過Send方法來完成的,須要一個Message參數。
一、發送消息:
   步驟:鏈接隊列-->指定消息格式-->提供要發送的數據(主體)-->調用Send()方法將消息發送出去。詳細見後面的示例程序。
二、序列化消息:
   消息序列化能夠經過.NET Framework附帶的三個預約義格式化程序來完成:
   --  XMLMessageFormatter對象----MessageQueue組件的默認格式化程序設置。
   --  BinaryMessageFormatter對象;
   --  ActiveXMessageFormatter對象; 
   因爲後二者格式化後的消息一般不能爲人閱讀,因此咱們常常用到的是XMLMessageFormatter對象。該對象構造方法有三種重載:
   一、public XmlMessageFormatter();
   二、public XmlMessageFormatter(string[] targetTypeNames);
   三、public XmlMessageFormatter(Type[] targetTypes);
   如咱們後面的示例程序中用到的序列化語句:
   //序列化爲字符串
   XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
讀取和接收消息
一、讀取消息:
   也就是從指定隊列中獲取消息。
二、接收消息有兩種方式:
   --> 經過 Receive() 方法。獲取移除消息。
   --> 經過 Peek() 方法。 獲取但不移除消息。

  路徑名和格式名能夠用於標識隊列,要查找隊列,必須區分公共隊列私有隊列

  公共隊列在Active Directory中發佈,對於這些隊列,無需知道他們所在的系統,私有隊列只有在已知隊列所在的系統名時才能找到。
  在Active Directory域中收索隊列的標籤,類別或格式名,就能夠找到公共隊列,還能夠得到機器上的全部隊列,

  MessageQueue類的靜態方法 GetPublicQueuesByLabel()GetPublicQueuesByCategory() GetPublicQueuesByMachine() 

  能夠收索隊列,GetPublicQueues() 方法返回包含域中全部公共隊列的數組。

如:

static void Main(string[] args)
{
 foreach(MessageQueue queue in MessageQueue.GetPublicQueues())
 {
   Console.WriteLine(queue.Path);
 }
}

消息隊列分爲如下幾種,每種隊列的路徑表示形式以下:

這裏的  MachineName  能夠用 「."代替,表明當前計算機

須要先引用System.Messaging.dll

MSMQ 支持兩種類型的隊列,

一、事務性隊列(transactional queue)會將消息持久(persiste)存儲到磁盤中,即使服務器當機(shutdown)、重啓(reboot)或崩潰(crash),

消息依然 能夠在系統恢復後被讀取。同時,消息發佈、獲取和刪除都在環境事務範圍內,從而確保消息的可靠性。

咱們還可使用 TransactionScope 將環境事務傳遞給隊列,不然隊列會自動建立一個內部事務。

二、非事務性隊列(nontransactional volatile queues)只是將消息存在內存,不會使用磁盤進行持久存儲,且不會使用事務來保護對消息的操做。

一但服務器發生問題,或者調用方出現異常,消息都會丟 失。

// 建立事務性隊列
1 MessageQueue.Create(@"./private$/myqueue", true);
// 建立非事務性隊列
1 MessageQueue.Create(@"./private$/myqueue");

三、消息隊列的優先級

  在MSMQ中消息在隊列裏傳輸是分有優先級的,  

  優先級一共有七種,MessagePriority枚舉裏所有進行了封裝。因這裏只做程序演示就不一一列舉出,僅用了HighestNormal兩種類型,

  關於消息隊列上進行消息傳輸的七種優先級你們能夠參考我下面提供的 MessagePriority 枚舉源代碼定義。

  那麼在發送消息的時候怎麼指定消息的優先級呢?在Message對象裏封裝有一個屬性Priority,

  接受一個枚舉MessagePriority類型的值來設置消息傳輸的優先級。以下:

1 Message message = new Message();
2 message.Priority = MessagePriority.Lowest;  //定義消息的優先級

   消息的優先級不能用於事務性消息發送。由於事務性消息是按照順序發生和接收消息的。

四、消息隊列超時設置和最大長度設置

 

1、非事務性消息發生接收

發送:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string msg = Console.ReadLine();
 6             //發生消息
 7             new SendQueueMessage().SendMessageQueue(msg);
 8             Console.ReadLine();
 9         }
10    }    
11 
12     /// <summary>
13     /// 消息隊列發生消息
14     /// </summary>
15     public class SendQueueMessage
16     {
17         /// <summary>
18         /// 消息隊列
19         /// </summary>
20         private MessageQueue messageQueue;
21 
22         public SendQueueMessage()
23         {
24             //路徑
25             string path = ".\\private$\\temp";
26             if (MessageQueue.Exists(path))
27             {
28                 //若是存在指定路徑的消息隊列,則獲取
29                 messageQueue = new MessageQueue(path);
30             }
31             else
32             {
33                 //不存在,則建立新的
34                 messageQueue = MessageQueue.Create(path);
35             }
36         }
37 
38         /// <summary>
39         /// 發生消息
40         /// </summary>
41         /// <param name="msg"></param>
42         public void SendMessageQueue(string msg)
43         {
44             Message message = new Message();
45             message.Body = msg;
46             //指定格式
47             message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
48             messageQueue.Send(message);
49         }
50     }

接收:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //接收
 6             new ReciveQueue().ReciveMessage();
 7             Console.ReadLine();
 8         }
 9     }
10 
11     /// <summary>
12     /// 消息隊列接收
13     /// </summary>
14     public class ReciveQueue
15     {
16         /// <summary>
17         /// 消息隊列
18         /// </summary>
19         private MessageQueue messageQueue;
20 
21         public ReciveQueue()
22         {
23             //路徑
24             string path = ".\\private$\\temp";
25             if (MessageQueue.Exists(path))
26             {
27                 //若是存在指定路徑的消息隊列,則獲取
28                 messageQueue = new MessageQueue(path);
29             }
30             else
31             {
32                 //不存在,則建立新的
33                 messageQueue = MessageQueue.Create(path);
34             }
35         }
36 
37         public void ReciveMessage()
38         {
39             while (true)
40             {
41                 //Message.Receive()是同步進行的,若是隊列中沒有消息,會阻塞當前線程
42                 Message message = messageQueue.Receive();
43                 message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
44                 string msg = message.Body.ToString();
45                 Console.WriteLine(msg);
46             }
47         }
48     }

注意:messageQueue.Receive()  若是沒有接收到消息,則會阻塞當前被調用線程,直到接收到消息爲止。

 這樣就在數據轉發服務器端建立了一個名爲  temp 的消息隊列;從客戶端要發送的消息就保存在這個隊列裏,

你能夠經過計算機管理->服務和應用下的消息隊列中看到你建立的 temp 隊列,private$關鍵字是說明隊列爲專用隊列,

前面的「.」表明建立的隊列目錄是本機,這個隊列一旦建立成功,就是系統的事了,接下來要作的就是你怎麼去把消息寫進這個隊列,或者讀取隊列的值 

這裏要特別注意不要將queuepath路徑字符串寫成 

 1         /// <summary>
 2         /// 獲取專用消息隊列
 3         /// </summary>
 4         /// <returns></returns>
 5         public static MessageQueue GetMessageQueue()
 6         {
 7             string publicMQPath = "FormatName:Direct=TCP:192.168.0.110\\private$\\DESKTOP-2AEJ0GU";
 8             //若是存在,獲取消息隊列
 9             if (MessageQueue.Exists(publicMQPath))
10             {
11                 queue = new MessageQueue(publicMQPath);
12             }
13             else
14             {
15                 //不存在,建立一個新的消息隊列
16                 queue = MessageQueue.Create(publicMQPath);
17             }
18             return queue;
19         }

這樣寫的話是用於遠程計算機對這個隊列進行訪問的,

由於MessageQueueCreate() 和 Exisit() 方法是沒辦法去識別上述FormatName格式的,

還有要確保 Create() 函數要被執行了以後再用 MessageQueue 實例去引用;這樣服務器端隊列的建立就完成了;

2、事務性消息隊列

  事務我想你們對這個詞應該都不會陌生,在操做數據庫的時候常常都會用到事務,確保操做成功,要麼所有完成(成功) 
,要麼所有不完成(失敗)。在MSMQ中利用事務性處理,能夠確保事務中的消息按照順序傳送,只傳送一次,而且從目的隊列成 
功地被檢索。
  那麼,在MSMQ上使用事務性處理怎麼實現呢?能夠經過建立 MessageQueueTransation 類的實例並將其關聯到 MessageQueue 
組件的實例來執行,執行事務的 Begin 方法,並將其實例傳遞到收發方法。而後,調用 Commit 以將事務的更改保存到目的隊列。
建立事務性消息和普通的消息有一點小小的區別,你們可從下圖上體會到:

 發送:

 1     /// <summary>
 2     /// 事務性消息發送
 3     /// </summary>
 4     public class MQTranSendTest
 5     {
 6         /// <summary>
 7         /// 執行發送
 8         /// </summary>
 9         public void Execute()
10         {
11             Console.WriteLine("請輸入發送消息:");
12             string flag = "N";
13             while (flag != "Y")
14             {
15                 string msg = Console.ReadLine();
16                 SendMessage(msg);
17                 Console.WriteLine("按回車鍵繼續,或者輸入 Y 退出程序!");
18                 flag = Console.ReadLine();
19             }
20         }
21 
22         /// <summary>
23         /// 格式化發生內容,執行發送
24         /// </summary>
25         /// <param name="msg">發送的消息</param>
26         private static void SendMessage(string msg)
27         {
28             Message message = new Message();
29             MessageQueue queue = GetMessageQueue();
30             message.Body = msg;
31             //指定格式
32             queue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
33             MessageQueueTransaction queueTransaction = new MessageQueueTransaction();
34             //啓動事務
35             queueTransaction.Begin();
36             queue.Send(message, queueTransaction);
37             //提交事務
38             queueTransaction.Commit();
39         }
40 
41         /// <summary>
42         /// 根據路徑獲取消息隊列
43         /// </summary>
44         /// <returns></returns>
45         private static MessageQueue GetMessageQueue()
46         {
47             string path = ".\\private$\\TranTemp";
48             if (MessageQueue.Exists(path))
49             {
50                 //若是存在指定路徑的消息隊列,則獲取
51                 return new MessageQueue(path);
52             }
53             else
54             {
55                 //不存在,則建立新的
56                 return MessageQueue.Create(path,true);
57             }
58         }
59     }

接收:

 1  /// <summary>
 2     /// 事務性接收消息
 3     /// </summary>
 4     public class MQTranReciveTest
 5     {
 6         /// <summary>
 7         /// 消息隊列
 8         /// </summary>
 9         private MessageQueue messageQueue;
10 
11         public MQTranReciveTest()
12         {
13             //路徑
14             string path = ".\\private$\\TranTemp";
15             if (MessageQueue.Exists(path))
16             {
17                 //若是存在指定路徑的消息隊列,則獲取
18                 messageQueue = new MessageQueue(path);
19             }
20             else
21             {
22                 //不存在,則建立新的
23                 messageQueue = MessageQueue.Create(path);
24             }
25         }
26 
27         public void ReciveMessage()
28         {
29             string msg = string.Empty;
30             while (true)
31             {
32                 if (messageQueue.Transactional)
33                 {
34                     MessageQueueTransaction queueTransaction = new MessageQueueTransaction();
35                     //啓動事務
36                     queueTransaction.Begin();
37                     //Message.Receive()是同步進行的,若是隊列中沒有消息,會阻塞當前線程
38                     Message message = messageQueue.Receive(queueTransaction);
39                     message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
40                     msg = message.Body.ToString();
41                     queueTransaction.Commit();
42                     Console.WriteLine(msg);
43                     msg = string.Empty;
44                 }
45             }
46         }
47     }

 

2、死信隊列

 死信隊列:因爲某種緣由沒法傳遞的消息都放置在死信隊列上(如:超時,超出消息隊列最大長度等)

 超時、消息隊列長度設置代碼:

     /// <summary>
        /// 格式化發生內容,執行發送
        /// </summary>
        /// <param name="msg">發送的消息</param>
        public static void SendMessage(string msg)
        {
            Message message = new Message(msg);
            MessageQueue queue = GetMainQueue();
            message.Body = msg;
            //指定格式
            queue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
            //設置日誌隊列最大個數
            //queue.MaximumJournalSize = 1;
            //設置消息隊列最大存放的個數
            //queue.MaximumQueueSize = 1;
            //發送消息的時間開始算起,消息到達目標隊列的時間
            //message.TimeToReachQueue = new TimeSpan(0, 0, 1);
            //從發送消息到接收消息的總時間
            //message.TimeToBeReceived = new TimeSpan(0, 0, 30);
            //設置爲true,發送失敗後進入死信隊列
            //message.UseDeadLetterQueue = true;
            queue.Send(message);
        }

如何訪問遠程的私有隊列?
如何修改消息隊列的最大存儲限制?
如何修改消息隊列的默認存儲位置?
如何保證計算機重啓以後隊列中的消息還在?

文檔:https://files.cnblogs.com/files/wwj1992/MSMQ_FAQ.rar

    https://files.cnblogs.com/files/wwj1992/MSMQ_Doc.rar

參考:http://www.javashuo.com/article/p-pcmfcteg-bd.html

Demo 下載

相關文章
相關標籤/搜索