使用VMware橋接模式組建局域網測試MSMQ(二)

上一篇講了搭建VMware虛擬機實現與宿主機相互通訊,環境已經就緒,如今就能夠作MSMQ的分佈式開發了。編程

本篇準備分四點介紹MSMQ:安全

1.MSMQ簡介服務器

2.MSMQ的安裝網絡

3.MSMQ編程開發app

4.Demo下載框架

 

1、MSMQ簡介dom

    MSMQ本質上是一種消息傳遞協議,它容許在單獨的服務端/客戶端運行的應用程序間已可靠的方式通訊。隊列用來臨時存儲消息,服務器端向隊列發送消息,客戶端從隊列讀取消息。這就使得即便服務器端和客戶端不在同一網絡中,不能直接訪問彼此,也能經過隊列進行通訊。相比之下,sockets和其餘網絡協議就不能作到這一點,它們只能在服務器端和客戶端可以直接相互通訊的狀況下才有效。socket

    MSMQ誕生於1997年,它已經被普遍使用在企業級軟件中。Microsoft已經把MSMQ合併到它的消息技術框架WCF當中。在WCF中,MSMQ被用於提供安全,可靠的傳輸和一個統一的編程模型,同時還兼容通訊標準。分佈式

    MSMQ負責企業內部和外部應用程序之間可靠地傳遞信息。MSMQ會把發送失敗的消息放置在一個隊列裏面,一旦目標能夠訪問,MSMQ會從新發送消息到目標中。它也支持消息的安全性和優先級機制。若是要消息超時或者由於其餘緣由失敗,能夠建立死信隊列進行查看。工具

    MSMQ也支持事務。它容許對多個隊列的多個操做在一個事務中進行,從而保證要麼全不執行,要麼所有不執行。微軟分佈式事務處理協調器(MSDTC)支持MSMQ和其餘資源的事務性訪問。

微軟消息隊列使用如下端口:

  • TCP: 1801
  • RPC: 135, 2101*, 2103*, 2105*
  • UDP: 3527, 1801
  • 消息隊列初始化的時候,若是RPC端口正在使用,那麼這些端口號有可能被增長11。查詢端口135發現該2XXX端口。

各個操做系統中使用的MSMQ版本以下:

  • Version 1.0 (May 1997). Supports Windows 95, Windows NT 4.0 SP3, Windows 98 and Windows Me.
  • Version 2.0, included with Windows 2000.
  • Version 3.0, included with Windows XP (Professional, not Home Edition) and Windows Server 2003.
  • Version 4.0, part of Windows Vista and Windows Server 2008.
  • Version 5.0, part of Windows 7 and Windows Server 2008 R2.
  • Version 6.0, part of Windows 8 and Windows Server 2012.
  • Version 6.3, part of Windows Server 2012 R2.

 

2、MSMQ的安裝

    本篇MSMQ部署在Windows Server 2008 R2中。win7/Windows Server 2008 R2 支持的MSMQ的Feature看錶格:

Feature (Windows 7 / Windows Server 2008 R2) Description
Microsoft Message Queue (MSMQ) Server Core / MSMQ Services This is the core service used for sending and receiving messages.
MSMQ Active Directory Domain Services Integration / Directory Service Integration This feature enables publishing of queue properties to Active Directory Domain Services (AD DS), default authentication and encryption of messages using certificates registered in AD DS, and routing of messages across Windows sites. MSMQ Active Directory Integration requires the computer to be joined to a domain.
MSMQ HTTP Support / HTTP Support This feature enables the sending of messages over Hypertext Transfer Protocol (HTTP). MSMQ HTTP support requires that Internet Information Services (IIS) be installed on the local computer.
MSMQ Triggers / Message Queuing Triggers This feature enables the invocation of a Component Object Model (COM) component or an executable file, depending on the filters that you define for the incoming messages in a given queue.
MSMQ DCOM Proxy / Message Queuing DCOM Proxy This feature enables Message Queuing applications to use a MSMQ COM application programming interface (API) to connect to a remote Message Queuing server.
Multicasting Support / Multicasting Support This feature supports multicasting messages to a multicast IP address and associating a queue with a multicast IP address.
Routing Service

This feature routes messages between different sites and within a site.

(note:The Routing Service feature is only available on Windows Server 2008 R2.)

    MSMQ做爲功能組件被集成在系統當中,在Window Server 2008 R2 安裝MSMQ很是簡單,點擊【開始\控制面板\管理工具\服務器管理器\功能\添加功能\依次展開MSM\MSMQ服務】。注意:要使用【路由服務】,服務器必須加入域。如圖:

 

3、MSMQ編程開發

    在講MSMQ編程開發以前,有必要先了解一下MSMQ的隊列。在MSMQ中,隊列用來臨時存儲不一樣類型的消息。隊列在邏輯上被劃分爲兩組:應用程序隊列和系統隊列。應用程序隊列能夠被應用程序建立,系統隊列由MSMQ建立。

    應用程序隊列包括消息隊列,管理隊列,響應隊列和報告隊列。

    消息隊列,應用程序間能夠經過消息隊列裏面的消息進行通訊,應用程序能夠經過消息隊列發送和接收消息。注意:應用程序建立隊列後,在計算機管理單元中的消息隊列中老是顯示小寫。然而,在MSMQ中,消息隊列的名稱是大小寫敏感的,因此在代碼中使用隊列名稱的時候要十分注意。例如,應用程序建立了一個名爲MYQUEUE的隊列,在計算機的管理單元中顯示爲myqueue。在應用程序中訪問這個隊列時,名稱必定要使用大寫的MYQUEUE,若是使用小寫myqueue,就會拋出一個異常。

    管理隊列,在發送消息的應用程序中能夠指定管理隊列,管理隊列存儲MSMQ發送的確認消息,確認消息標識應用程序發送的消息是否成功。

    響應隊列,發送消息的應用程序能夠指定響應隊列,接收消息的應用程序發送響應消息到響應隊列。

    報告隊列,消息每次經過MSMQ路由服務器傳遞,MSMQ會發送一條報告消息進行跟蹤,報告消息存儲在報告隊列中。報告隊列由發送程序指定和啓用。

    系統隊列由MSMQ或者MSMQ管理員建立,包括日誌隊列和死信隊列。

    日誌隊列,當應用程序隊列被建立後,MSMQ自動建立一條日誌消息跟蹤被讀取的消息。

    死信隊列,保存未能被正確發送的消息。MSMQ提供兩種死信隊列,一種是非事務死信隊列,一種是事務性死信隊列。

    隊列的基本概念講完了,下面就開始講講具體的使用。園子裏已經有不少很好的文章介紹MSMQ消息隊列的建立,消息的發送和消息的接收,我就再也不重複造輪子了。可是在真實使用場景中,若是消息發送不成功,而消息又很是重要每一個消息都必須處理,這種狀況該怎麼使用MSMQ,園子裏介紹的文章倒不是不少,因此這裏我就重點介紹一下管理隊列和死信隊列的用法。

    1. 新建消息發送控制檯應用程序TestAck,黏貼一下代碼:

public class MyNewQueue
    {
        static void Main(string[] args)
        {
            // Create a new instance of the class.
            MyNewQueue myNewQueue = new MyNewQueue();

            // Create new queues.
            CreateQueue(".\\private$\\myQueue");
            CreateQueue(".\\private$\\myAdministrationQueue");

            // Send messages to a queue.
            myNewQueue.SendMessage();
            Console.ReadLine();
        }

        //**************************************************
        // Creates a new queue.
        //**************************************************

        public static void CreateQueue(string queuePath)
        {
            try
            {
                if (!MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Create(queuePath);
                }
                else
                {
                    Console.WriteLine(queuePath + " already exists.");
                }
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }

        }

        //**************************************************
        // Sends a string message to a queue.
        //**************************************************

        public void SendMessage()
        {

            // Connect to a queue on the local computer.
            MessageQueue myQueue = new MessageQueue($".\\private$\\myQueue");;
            myQueue.Label = "label1";
            // Create a new message.
            Message myMessage = new Message("Original Message");

            myMessage.AdministrationQueue = new MessageQueue(".\\private$\\myAdministrationQueue");
            myMessage.AcknowledgeType = AcknowledgeTypes.NegativeReceive;
            myMessage.UseDeadLetterQueue = true;
            myMessage.TimeToBeReceived = TimeSpan.FromSeconds(2);
            myMessage.Label = "label1";

            // Send the Order to the queue.
            Thread.Sleep(TimeSpan.FromSeconds(3));
            myQueue.Send(myMessage);

            return;
        }
    }

 

myMessage.AdministrationQueue = new MessageQueue(".\\private$\\myAdministrationQueue");

指定消息的管理隊列,管理隊列的建立很普通消息隊列是同樣的。

myMessage.UseDeadLetterQueue = true;

指定消息使用死信隊列,消息發送不成功時,系統會發送消息的副本到死信隊列。

myMessage.AcknowledgeType = AcknowledgeTypes.NegativeReceive;

指定當消息隊列未能接收消息時,發送確認消息到管理隊列。AcknowledgeTypes是一個枚舉類型,枚舉值:

PositiveArrival

一個掩碼,用於在原始消息到達隊列時請求確定確認。

PositiveReceive

一個掩碼,用於在成功從隊列檢索到原始消息時請求確定確認。

NegativeReceive

一個掩碼,用於當未能從隊列接收原始消息時請求否認確認。

None

一個掩碼,用於請求不發送任何確認消息(不管是確定的仍是否認的)。

NotAcknowledgeReachQueue

一個掩碼,用於在原始消息不能到達隊列時請求否認確認。當到達隊列時間計時器過時時或不能對消息進行身份驗證時,可能請求否認確認。

NotAcknowledgeReceive

一個掩碼,用於當發生錯誤時請求否認確認,防止在其接收時間計時器過時前從隊列接收原始消息。

FullReachQueue

一個掩碼,用於在原始消息到達隊列時請求確定確認,或者用於到達隊列時間計時器過時後請求否認確認,或者用於不能對原始消息進行身份驗證時請求否認確認。

FullReceive

一個掩碼,用於在接收時間計時器過時前從隊列收到原始消息時請求確定確認,不然請求否認確認。

 備註:確認消息是系統生成的非事務性消息,它們標識由應用程序發送的消息是否成功發送到目標隊列,也能夠標識是消息否被應用程序成功讀取。

 myMessage.TimeToBeReceived = TimeSpan.FromSeconds(2);

 Thread.Sleep(TimeSpan.FromSeconds(3));

 myQueue.Send(myMessage);

 模擬消息發送超時,消息隊列未能接收消息的狀況。消息的TimeToBeReceived屬性指定隊列接收消息的超時時間。

 

代碼執行後,查看計算機管理單元中的消息隊列狀況,因爲發送消息超時,因此消息隊列myQueue中沒有接收到消息;管理隊列被指定爲當消息發送不成功時接收確認消息,因此管理隊列中由系統自動生成了一條確認消;消息指定使用死信隊列,因此係統同時發送消息的副本到死信隊列。以下圖:

 

 

2.新建接收消息控制檯應用程序AckClient,黏貼如下代碼

class ReceiveClient
    {
        static void Main(string[] args)
        {
            new ReceiveClient().ReceiveAckMessage();
            new ReceiveClient().ReceiveDeadLetterMessage();
            Console.ReadLine();
        }

        private void CreateQueue(string path)
        {
            try
            {
                if (!MessageQueue.Exists(path))
                {
                    MessageQueue.Create(path);
                }
                else
                {
                    Console.WriteLine("該隊列已存在!");
                }
            }
            catch (MessageQueueException ex)
            {
                throw;
            }
        }

        private void ReceiveAckMessage()
        {

            MessageQueue myQueue = new MessageQueue(".\\private$\\myAdministrationQueue");
            myQueue.MessageReadPropertyFilter.CorrelationId = true;
            myQueue.MessageReadPropertyFilter.Acknowledgment = true;
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });


            MessageQueue mySendQueue = new MessageQueue(".\\private$\\myQueue");
            mySendQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
            try
            {

                Message myMessage = myQueue.Receive(TimeSpan.FromSeconds(3));
                Console.WriteLine("_______________________________");
                Console.WriteLine("Ack Message body: " + myMessage.Body.ToString());
                Console.WriteLine("Ack Message Id: " + myMessage.Id);
                Console.WriteLine("Correlation Id: " + myMessage.CorrelationId);
                Console.WriteLine("Acknowledgment Type: " + myMessage.Acknowledgment.ToString());
                Console.WriteLine("_______________________________");

            }
            catch (MessageQueueException ex)
            {
                throw;
            }
        }

        private void ReceiveDeadLetterMessage()
        {
            MessageQueue myQueue = new MessageQueue("FormatName:Direct=os:.\\System$;DEADLETTER");
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
            myQueue.MessageReadPropertyFilter.CorrelationId = true;
            try
            {
                Message myMessage = myQueue.Receive(TimeSpan.FromSeconds(3));
                Console.WriteLine("__________________________________");
                Console.WriteLine("Dead Letter Id: " + myMessage.Id);
                Console.WriteLine("Dead Letter Body: " + myMessage.Body.ToString());
                Console.WriteLine("__________________________________");
            }
            catch (MessageQueueException ex)
            {
                throw;
            }
        }
    }


ReceiveAckMessage()方法從管理隊列接收確認消息。
myMessage.CorrelationId: 確認消息的CorrelationId屬性標識原始消息的消息ID
myMessage.Acknowledgment: 確認消息的Acknowledgment屬性標識確認消息的類型。

ReceiveDeadLetterMessage()方法從死信隊列接收死信消息。
myMessage.Body: 死信消息時原始消息的副本,死信消息的body屬性值與原始消息的body屬性值相同。
myMessage.Id: 死信消息的消息ID與原始消息的消息ID相同。

執行效果如圖:


4、Demo下載
TestAck.zip

總結:有了上面的知識,咱們就能夠處理未發送成功的消息。解決方案是在發送消息的程序中使用死信隊列;在讀取消息的程序中,讀取死信隊列的死信消息,而後從新處理死信消息。

結尾:示例代碼是在本機建立專用隊列,能夠正常執行。若是在遠程計算機中,不能直接使用Create方法建立隊列,也不能使用Exsits方法判斷隊列是否存在,個人作法是在遠程計算機中手動建立隊列,而後配置隊列的讀取權限。注意,還要把【消息隊列】\【屬性】\【服務器安全性】\【禁用未經身份驗證的RPC調用】取消。感謝園友【一我的的長征】的提醒。

結束語:關於MSMQ的知識點,感受有好多要講,好比隊列的訪問方式,事務隊列的使用,日誌隊列的使用。若是之後有時間,我計劃從新整理本篇,作成一個系列,把每一個知識點都講一講。好了,就像寫程序同樣,不可能初版就作的很完美,須要一個迭代的過程,不斷的去重構,不斷的去優化

相關文章
相關標籤/搜索