在一個大型的分佈式系統中,消息隊列是不可缺乏的中間件,能很好的解決異步消息、應用解耦、均衡併發等問題。在.net中,偶然發現一個效率不錯、安全可靠、功能齊全的消息組件,忍不住翻譯過來,供你們快速預覽。java
注:原做者用windows服務啓動消息隊列服務,可是本人在win10上測試出錯,可自行改爲控制檯啓動消息隊列服務,而後用第三方工具註冊服務(如:SrvanyUI)web
原文:http://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-Message-Queue-System-for-NET算法
正文: 數據庫
一個新的、獨立的、開源的,徹底基於C#和.NET Framework3.5的消息隊列系統apache
下載二進制文件 - 933 KBwindows
文章概要安全
介紹服務器
在這篇文章中,我將介紹一個新的、獨立的、開源的,徹底基於C#和.NET Framework3.5的消息隊列系統,DotNetMQ是一個消息代理,它包括確保傳輸,路由,負載均衡,服務器圖等等多項功能。我將從解釋消息的概念和消息代理的必要性講起,而後,我會說明什麼是DotNetMQ,以及如何使用它。
什麼是消息傳遞
消息傳遞是一種異步通訊方式,具體就是在同一個或不一樣的機器上運行的多個應用程序之間可靠的消息傳遞。應用程序經過發送一種叫消息的數據包和其餘應用程序通訊。
一個消息能夠是一個字符串,一個字節數組,一個對象等等。一般狀況下,一個發送者(生產者)程序建立一個消息,並將其推送到一個消息隊列,而後一個接受者(消費者)程序從隊列中獲取這個消息並處理它。發送程序和接受程序不須要同時運行,由於消息傳遞是一個異步過程。這就是所謂的鬆耦合通訊。
另外一方面,Web服務方法調用(遠程方法調用)是一種緊耦合的同步通訊(這兩個應用程序在整個通訊的過程當中都必須是運行着而且可用,若是Web服務脫機或在方法調用期間發生錯誤,那麼客戶端應用程序將獲得一個異常)。
圖 - 1:兩個應用程序間最簡單的消息傳遞。
在上圖中,兩個應用程序經過消息隊列進行鬆散耦合方式通訊。若是接受者處理消息的速度慢於發送者產生消息的速度,那麼隊列裏的消息數就會增長。此外,在發送者發送消息的過程當中,接受者多是離線的。在這種狀況下,當接收者上線後,它會從隊列中獲得消息(當它開始並加入這個隊列時)。
消息隊列一般由消息代理提供。消息代理是一個獨立的應用程序(一個服務),其餘應用程序經過鏈接它發送、接收消息。在消息被接收者接收以前,消息代理負責存儲消息。消息代理能夠經過路由多臺機器把消息傳送給目標應用程序,在消息被接收者正確處理以前,消息代理會一直嘗試傳送它。有時候消息代理也被稱爲面向消息的中間件(Message-Oriented-Middleware MOM)或者簡單的叫消息隊列(Message Queue MQ).
什麼是DotNetMQ?
DotNetMQ是一個開源的消息代理,它有如下幾個特色:
在開始建立它的時候,我更喜歡叫它爲MDS(消息傳送系統 Message Delivery System)。由於它不只是一個消息隊列,並且仍是一個直接傳送消息到應用程序的系統和一個提供了創建應用服務框架的環境。我把它叫作DotNetMQ,是由於它徹底由.NET開發,並且這個名字也更好記。因此它原來的名字是MDS,以致於源碼裏有許多以MDS爲前綴的類。
爲何要一個新的消息代理?
消息代理的必要性
首先,我將演示一個須要消息代理的簡單狀況。
在個人業務經歷中,我見到過一些很是糟糕且不尋常的異步企業應用集成解決方案。一般是運行在一臺服務器上的一個程序執行一些任務,而且產生一些數據,而後將結果數據發送到另外一臺服務器上的另外一個程序。第二個應用在數據上執行其餘任務或計算結果(這臺服務器在同一網絡中或是經過互聯網鏈接)。另外,消息數據必須是持久的。即便遠程程序沒有工做或網絡不可用,消息必須第一時間發送過去。
讓咱們來看看下面的設計圖:
圖 - 2:一個糟糕的集成應用程序解決方案。
Application -1 和Application -2是可執行程序(或是Windows服務),Sender Service是一個Windows服務。Application -1執行一些任務,產生數據,並調用Server-B服務器上的Remote Web Service方法來傳輸數據。這個web服務將數據插入到數據表。Application -2按期檢查數據表來得到新的數據行並處理它們(而後從表中刪除它們,或將其標記爲已處理,避免處理重複數據)。
若是在調用Web服務時或Web服務處理數據時出錯,數據不能丟失,而且稍後必須重發。可是,Application -1有其餘任務要作,因此它不能一次又一次的嘗試重發數據。它只是將數據插入到數據表。另外一個Windows服務(若是Application -1是一直運行的,也可使裏的一個線程)按期檢查這個表,並嘗試將數據發送到Web服務,直到數據成功發送。
這個解決方案的確是可靠的(消息確保傳送了),但它不是兩個應用程序之間通訊的有效方式。該解決方案有一些很是關鍵的問題:
如今用消息代理來作這全部的事情,用最有效的方式負責將消息傳送給遠程應用。同一應用程序集成用上DotNetMQ展現於下圖。
圖 - 3:使用DotNetMQ的簡單消息傳遞。
DotNetMQ是一個獨立的Windows服務,分別運行在Server-A和Server-B服務器上。所以,你只需編寫代碼和DotNetMQ通訊。使用DotNetMQ客戶端類庫,和DotNetMQ服務發送、接收信息是很是容易和快速的。Application -1準備消息,設置目標,並將消息傳遞給DotNetMQ代理。DotNetMQ代理將以最有效和最快的方式傳遞給Application -2。
現有的消息代理
很顯然,在集成應用程序中消息代理是有必要的。我網上搜索,查找書籍,想找一個免費的(最好也是開源的)並且是.Net用起來很容易的消息代理。讓咱們看看我找到了什麼:
如你所見,在上面的列表中沒有哪個消息代理是徹底由.NET開發的。
從用戶角度來看,我只是想經過「消息數據,目標服務器和應用程序名稱」來定位個人代理。其餘的我都不關心。他將會根據須要在網絡上屢次路由一個消息,最後發送到目標服務器的目標程序上。個人消息傳送系統必須爲我提供這個便利。這是個人出發點。我根據這一點大概設計了消息代理的結構。下圖顯示了我想要的。
圖 - 4:自動路由消息的消息代理服務器圖。
Application -1 傳遞一個消息到本地服務器(Server-A)上的消息代理:
Server-A沒有直接和Server-D鏈接。所以,消息代理在服務器間轉發消息(這個消息依次經過Server-A,Server-B,Server-C,Server-D),消息最後到達Server-D上的消息代理,而後傳遞給Application -2。注意在Server-E上也有一個Application-2在運行,可是它不會收到這個消息,由於消息的目標服務器是Server-D。
DotNetMQ提供了這種功能和便利。它在服務器圖上找到最佳的(最短的)路徑把消息從原服務器轉發到目標服務器。
通過這種全面的介紹會,讓咱們看看若是在實踐中使用DotNetMQ。
安裝、運行DotNetMQ
如今尚未實現自動安裝,不過安裝DotNetMQ是很是容易的。下載並解壓文章開始提供的二進制文件。只需將全部的東西複製到C:\Progame Files\DotNetMQ\下,而後運行INSTALL_x86.bat(若是你用的是64位系統,那麼將執行INSTALL_x64)。
你能夠檢查Windows服務,看看DotNetMQ是否已經安裝並正常工做。
第一個DotNetMQ程序
讓咱們看看實際中的DotNetMQ。爲了使第一個程序足夠簡單,我假設是同一臺機器上的兩個控制檯應用程序(實際上,就像咱們待會在文章中看到的那個,和在兩臺機器上的兩個應用程序是沒什麼顯著差別的,只是須要設置一下消息的目標服務器名字而已)。
註冊應用程序到DotNetMQ
咱們的應用程序爲了使用DotNetMQ,要先註冊一下,只需操做一次,是一個很是簡單的過程。運行DotNetMQ管理器(DotNETMQ文件夾下的MDSManager.exe,如上所訴,默認是在C:\Programe Files\DotNetMQ\文件夾下),並在Applications菜單中打開Application類表。點擊Add New Appliction按鈕,輸入應用程序名稱。
如上所述,添加Application1和Application2到DotNetMQ。最後,你的應用程序列表應該像下面這樣。
圖 - 5:DotNetMQ管理工具的應用程序列表界面。
開發Application1
在Visual Studio中建立一個名稱爲Application1的控制檯應用程序,並添加MDSCommonLib.dll引用,這個dll文件裏提供了鏈接到DotNetMQ必需的一些類。而後在Program.cs文件中寫上下面的代碼:
using System; using System.Text; using MDS.Client; namespace Application1 { class Program { static void Main(string[] args) { //Create MDSClient object to connect to DotNetMQ //Name of this application: Application1 var mdsClient = new MDSClient("Application1"); //Connect to DotNetMQ server mdsClient.Connect(); Console.WriteLine("Write a text and press enter to send " + "to Application2. Write 'exit' to stop application."); while (true) { //Get a message from user var messageText = Console.ReadLine(); if (string.IsNullOrEmpty(messageText) || messageText == "exit") { break; } //Create a DotNetMQ Message to send to Application2 var message = mdsClient.CreateMessage(); //Set destination application name message.DestinationApplicationName = "Application2"; //Set message data message.MessageData = Encoding.UTF8.GetBytes(messageText); //Send message message.Send(); } //Disconnect from DotNetMQ server mdsClient.Disconnect(); } } }
在建立MDSClient對象時,咱們把要鏈接的應用程序名稱傳給構造函數,用這個構造函數,咱們將用默認端口(10905)鏈接本地服務器(127.0.0.1)上的DotNetMQ。重載的構造函數能夠用於鏈接其餘服務器和端口。
MDSClient的CreateMessage方法返回一個IOutgoingMessage的對象。對象的MessageData屬性是實際發送給目標應用程序的數據,它是一個字節數組。咱們使用UTF8編碼把用戶輸入的文本轉換成字節數組。對象的DestinationApplicationName和DestinationServerName屬性是用於設置消息的目標地址。若是咱們沒有指定目標服務器,默認就是本地服務器。最後,咱們發送這個消息對象。
開發Application2
在Visual Studio裏建立一個新的控制檯應用程序,命名爲Application2,添加MDSCommonLib.dll並寫下如下代碼:
using System; using System.Text; using MDS.Client; namespace Application2 { class Program { static void Main(string[] args) { //Create MDSClient object to connect to DotNetMQ //Name of this application: Application2 var mdsClient = new MDSClient("Application2"); //Register to MessageReceived event to get messages. mdsClient.MessageReceived += MDSClient_MessageReceived; //Connect to DotNetMQ server mdsClient.Connect(); //Wait user to press enter to terminate application Console.WriteLine("Press enter to exit..."); Console.ReadLine(); //Disconnect from DotNetMQ server mdsClient.Disconnect(); } /// <summary> /// This method handles received messages from other applications via DotNetMQ. /// </summary> /// <param name="sender"></param> /// <param name="e">Message parameters</param> static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Get message var messageText = Encoding.UTF8.GetString(e.Message.MessageData); //Process message Console.WriteLine(); Console.WriteLine("Text message received : " + messageText); Console.WriteLine("Source application : " + e.Message.SourceApplicationName); //Acknowledge that message is properly handled //and processed. So, it will be deleted from queue. e.Message.Acknowledge(); } } }
咱們用和Application1類似的方法建立一個MDSClient對象,不一樣的就是鏈接應用程序的名稱是Application2。爲了接收消息,須要給MDSClient對象註冊MessageReceived事件。而後咱們鏈接DotNetMQ,直到用戶輸入Enter才斷開。
當一個消息發送給Application2是,MDSClient_MessageReceived方法就會被調用來處理消息。咱們從MessageReceivedEventArgs參數對象的Message屬性能夠獲得發送過來的消息。這個消息的類型是IIncomingMessage。IIncomingMessage對象的MessageData屬性實際包含了由Application1發送的消息數據。因爲它是一個字節數組,咱們用UTF8編碼把它轉換成字符串。而後把文本消息打印到控制檯上。
圖 - 6:Application1經過DotNetMQ發送兩個消息到Application2。
處理傳入消息以後,還須要來確認這個消息。這表示消息已經正確接收並處理。而後DotNetMQ將從消息隊列中把消息刪除。咱們也能夠用Reject方法拒絕一個消息(若是在出錯的狀況下咱們不能處理這個消息)。在這種狀況下,該消息將回到消息隊列,稍後再試着發到目標應用程序(若是在同一個服務器上存在另外一個Application2的實體,也可能發到另外一個上)。這是DotNetMQ系統的一個強大機制。所以,能夠確保消息不會丟失並絕對能夠被處理。若是你不確認或拒絕一個消息,系統假設是被拒絕的。因此,即便你的應用程序崩潰了,在你的應用程序正常運行後,仍是會收到消息的。
若是你在同一臺服務器上運行多個Application2的實例,哪個會收到消息呢?在這種狀況下,DotNetMQ會把消息順序地發給這多個實例。因此你能夠建立多發送/接收的系統。一個消息只能被一個實例接收(實例接收相互不一樣的消息)。DotNetMQ提供這全部功能和同步。
消息屬性:傳送規則(Transmit Rule)
在發送一個消息以前,你能夠像這樣設置一個消息的Transmit Rule屬性:
message.TransmitRule = MessageTransmitRules.NonPersistent;
傳送規則有三種類型:
因爲默認的傳送規則是StoreAndForward,讓咱們試試下面這些:
即便在Application1發送過消息後,你中止了DotNetMQ服務,你的消息也是不會丟失的,這就叫持久化。
客戶端屬性:通信方式(CommunicationWay)
默認狀況下,一個應用程序能夠經過MDSClient發送和接收消息(CommunicationWays.SendAndReceive)。若是一個應用程序不須要接收消息,能夠設置MDSClient的CommunicationWay爲CommunicationWays.Send。這個屬性在鏈接DotNetMQ以前或在和DotNetMQ通訊中均可以改變。
客戶端屬性:出錯時從新鏈接服務器(ReConnectServerOnError)
默認狀況下,MDSClient因爲某種緣由斷開DotNetMQ時會自動重連。因此,即便你重啓DotNetMQ服務,也不用重啓你的應用程序。你能夠把ReconnectServerOnError設置爲false來禁用自動重連。
客戶端屬性:自動確認消息(AutoAcknowledgeMessages)
默認狀況下,你必須在MessageReceived事件中顯式的確認消息。不然,系統將認爲消息是被拒絕了。若是你想改變這種行爲,你須要把AutoAcknowledgeMessages屬性設爲true。在這種狀況下,若是你的MessageReceived事件處理程序沒有拋出異常,你也沒有顯式確認和拒絕一個消息,系統將自動確認該消息(若是拋出異常,該消息將被拒絕)。
配置DotNetMQ
有兩種方式能夠配置DotNetMQ:經過XML配置文件或用DotNetMQ管理工具(一個Windows Forms程序),這裏我分別演示這兩種方法,有些配置是及時生效的,而有些則須要重啓DotNetMQ。
服務端
你能夠只在一臺服務器上運行DotNetMQ,在這種狀況下,是不須要爲服務器配置任何東西的。但若是你想在多臺服務器上運行DotNetMQ並使它們相互通訊,你就須要定義服務器圖了。
一個服務器圖包含兩個或更多個節點,每個節點都是一個具備IP地址和TCP端口(被DotNetMQ用的那個)的服務器。你能夠用DotNetMQ管理器配置/設計一個服務器圖。
圖 - 8:DotNetMQ服務器圖管理。
在上圖中,你看到了一個包含5個節點的服務器圖。紅色節點表示當前服務器(當前服務器就是你用DotNetMQ管理器鏈接的那個)。直線表示兩個節點(它們互爲相鄰節點)是可鏈接的(它們能夠發送/接收消息)。服務器/節點圖形中的名稱是很重要的,它被用來向該服務器發送消息。
你能夠雙擊圖形中的一個服務器來編輯它的屬性。爲了鏈接兩個服務器,你要按住Ctrl鍵,點擊第一個再點擊第二個(斷開鏈接也是相同的操做)。你能夠經過點擊右鍵,選擇Set as this server來設置管理器鏈接該服務器。你能夠從圖中刪除一個服務器或經過右鍵菜單添加一個新的服務器。最後,你能夠經過拖拽添加或移除服務器。
當你設計好服務器圖以後,你必須點擊Save & Update Graph按鈕來保存這些修改。這些修改將保存在DotNetMQ安裝目錄的MDSSettings.xml文件裏。你必須重啓DotNetMQ才能應用這些修改。
對於上面的服務器圖,對應的MDSSettings.xml設置以下:
<?xml version="1.0" encoding="utf-8"?> <MDSConfiguration> <Settings> ... </Settings> <Servers> <Server Name="halil_pc" IpAddress="192.168.10.105" Port="10099" Adjacents="emre_pc" /> <Server Name="emre_pc" IpAddress="192.168.10.244" Port="10099" Adjacents="halil_pc,out_server,webserver1,webserver2" /> <Server Name="out_server" IpAddress="85.19.100.185" Port="10099" Adjacents="emre_pc" /> <Server Name="webserver1" IpAddress="192.168.10.263" Port="10099" Adjacents="emre_pc,webserver2" /> <Server Name="webserver2" IpAddress="192.168.10.44" Port="10099" Adjacents="emre_pc,webserver1" /> </Servers> <Applications> ... </Applications> <Routes> ... </Routes> </MDSConfiguration>
固然,這個配置是要根據你實際的網絡進行的。你必須在圖中全部服務器上安裝DotNetMQ。此外,還必須在全部服務器上配置相同的服務器圖(你能夠很容易地從XML文件複製服務器節點到其餘服務器上)。
DotNetMQ採用段路徑算法發送消息(沒有在XML配置文件裏手動定義路由的狀況下)。考慮這個情景,運行在halil_pc的Application A發送一個消息到webserver2上的Application B,路徑是很簡單的:Application A -> halil_pc -> emre_pc -> webserver2 -> Application B。halil_pc經過服務器圖定義知道下一個要轉發到的服務器(emre_pc)。
最後,MDSSettings.design.xml包含了服務器圖的設計信息(節點在屏幕上的位置)。這個文件只是用於DotNetMQ管理器的服務器圖窗體,運行時的DotNetMQ服務是不須要的。
應用程序
就像圖 - 5顯示的那樣,你能夠把和DotNetMQ關聯的應用程序做爲消息代理來添加/刪除。對於這些修改是不須要重啓DotNetMQ的。應用程序的配置也保存在MDSSettings.xml文件裏,就像下面這樣:
<?xml version="1.0" encoding="utf-8"?> <MDSConfiguration> ... <Applications> <Application Name="Application1" /> <Application Name="Application2" /> </Applications> ... </MDSConfiguration>
一個應用程序必須在這個列表裏才能和DotNetMQ鏈接。若是你直接修改xml文件,你必須重啓DotNetMQ服務才能生效。
路由/負載均衡
DotNetMQ的有一個路由功能。如今路由設置只能經過MDSSettings.xml設置。你能夠看到下面文件裏有兩種路由設置:
<?xml version="1.0" encoding="utf-8" ?> <MDSConfiguration> ... <Routes> <Route Name="Route-App2" DistributionType="Sequential" > <Filters> <Filter DestinationServer="this" DestinationApplication="Application1" /> </Filters> <Destinations> <Destination Server="Server-A" Application="Application1" RouteFactor="1" /> <Destination Server="Server-B" Application="Application1" RouteFactor="1" /> <Destination Server="Server-C" Application="Application1" RouteFactor="1" /> </Destinations> </Route> <Route Name="Route-App2" DistributionType="Random" > <Filters> <Filter DestinationServer="this" DestinationApplication="Application2" /> <Filter SourceApplication="Application2" TransmitRule="StoreAndForward" /> </Filters> <Destinations> <Destination Server="Server-A" Application="Application2" RouteFactor="1" /> <Destination Server="Server-B" Application="Application2" RouteFactor="3" /> </Destinations> </Route> </Routes> ... </MDSConfiguration>
每一個路由節點有兩個屬性:Name屬性是對用戶友好的顯示(不影響路由功能),DistributionType是路由的策略。這裏有兩種類型的路由策略:
Filters用於決定消息使用哪一個路由。若是一個消息的屬性和其中一個過濾器匹配,該消息就會被路由。這有5個條件(XML的5個屬性)來定義一個過濾器:
過濾消息時,不會考慮沒有定義的條件。因此,若是全部的條件都是空的(或直接沒定義),那麼全部的消息都適合這個過濾器。只有全部的條件都匹配時,一個過濾器才適合這個消息。若是一個消息正確匹配(至少是過濾器定義的都匹配)一個路由中的一個過濾器,那麼這個路由將被選擇並使用。
Destinations是用來將消息路由到其餘服務器用的。一個目標服務器被選中是根據Route節點的DistributionType屬性(前面解釋過)決定的。一個destination節點必須定義三個屬性:
修改路由配置,必須重啓DotNetMQ纔會生效。
其餘設置
目前DotNetMQ支持3中存儲類型:SQLite(默認),MySQL和內存(譯者注:根據下面內容,還支持MSSQL)。你能夠在MDSSettings.xml修改存儲類型。
下面是一個使用MySQL-ODBC做爲存儲的簡單配置:
<Settings> <Setting Key="ThisServerName" Value="halil_pc" /> <Setting Key="StorageType" Value="MySQL-ODBC" /> <Setting Key="ConnectionString" Value="uid=root;server=localhost;driver={MySQL ODBC 3.51 Driver};database=mds" /> </Settings>
你能夠在Setup\Databases文件夾(這個文件夾在DotNetMQ的安裝目錄)找到所需的文件,而後建立數據庫和數據表,以供DotNetMQ使用。若是你有什麼問題,能夠隨時問我。
還有一個設置是定義"current/this"這個名稱表明哪臺服務器的,這個值必須是Servers節點裏的一個服務器名。若是你用DotNetMQ管理器編輯服務器圖,這個值是自動設置的。
網絡傳輸消息
向一個網絡服務器的應用程序發消息是和向同一個服務器的應用程序發消息同樣簡單的。
一個簡單的應用程序
讓咱們考慮下面這個網絡:
圖 - 8:兩個應用程序經過DotNetMQ在網絡上通訊。
運行在ServerA上的Application1想發消息到ServerC上的Application2,因爲防火牆的規則,ServerA和ServerC不能直接鏈接。讓咱們修改一下在第一個DotNetMQ程序裏開發的程序。
Application2甚至一點有不用修改,只要把Application2上ServerC上運行並等待傳入的消息便可。
Application1只是在如何發消息的地方稍微改動一點,就是設置DestinationServerName(目標服務器名)爲ServerC。
var message = mdsClient.CreateMessage(); message.DestinationServerName = "ServerC"; //Set destination server name here! message.DestinationApplicationName = "Application2"; message.MessageData = Encoding.UTF8.GetBytes(messageText); message.Send();
就這樣,就完事兒了。你不須要知道ServerC在哪裏,也不須要直接鏈接ServerC...這些所有定義在DotNetMQ設置裏。注意:若是你不給一個消息設置DestinationServerName,系統假設目標服務器就是"current/this"指定的那臺服務器,DotNetMQ也將把消息發送到同一臺服務器上的應用程序。另外,若是你定義了必要的路由,你就沒必要設置目標服務器了,DotNetMQ會自動地路由消息。
固然,DotNetMQ的設置必須根據服務器間的鏈接(服務器圖)來設置,而且Application1和Application2必須像配置DotNetMQ部分說的那樣註冊到DotNetMQ服務器。
一個真實的案例:分佈式短信處理器(Distributed SMS Processor)
正如你已看到的那樣,DotNetMQ能夠用於構建分佈式,負載均衡應用系統。在本節中,我將討論一個生活中真實的場景:一個分佈式消息處理系統。
假定有一個用於音樂比賽投票的短消息(MSM)服務。全部競賽者唱過他們的歌曲後,觀衆給他們最喜歡的歌手投票,會發一條像"VOTE 103"這樣的短信到咱們的短息服務器。並假定此次投票會在短短的30分鐘完成,大約有五百萬人發短息到咱們的服務。
咱們將會接收每一條短息,處理它(格式化短息文本,修改數據庫,以便增長選手的票數),並要發送確認消息給發送者。咱們從兩臺服務器接收消息,在四臺服務器上處理消息,而後從兩臺服務器上發送確認消息。咱們總共有八臺服務器。讓咱們看看完整的系統示意圖:
圖 - 9:分佈式短信處理系統
這裏有三種類型的應用:接受者,處理器,和發送者。在這種狀況下,你就可使用DotNetMQ做爲消息隊列和負載均衡器,經過配置服務器圖和路由(就像配置DotNetMQ小節中描述的那樣),來構建一個分佈式的,可擴展的消息處理系統。
請求/應答式通訊
在許多狀況下,一個應用發一個消息到另外一個應用,而後獲得一個應答消息。DotNetMQ對這種通訊方式有內置的支持。考慮這樣一個服務:用於查詢庫存的狀態。這裏有兩種消息類型:
[Serializable] public class StockQueryMessage { public string StockCode { get; set; } } [Serializable] public class StockQueryResultMessage { public string StockCode { get; set; } public int ReservedStockCount { get; set; } public int TotalStockCount { get; set; } }
下面展現了一個簡單的庫存服務。
using System; using MDS; using MDS.Client; using StockCommonLib; namespace StockServer { class Program { static void Main(string[] args) { var mdsClient = new MDSClient("StockServer"); mdsClient.MessageReceived += MDSClient_MessageReceived; mdsClient.Connect(); Console.WriteLine("Press enter to exit..."); Console.ReadLine(); mdsClient.Disconnect(); } static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Get message var stockQueryMessage = GeneralHelper.DeserializeObject(e.Message.MessageData) as StockQueryMessage; if (stockQueryMessage == null) { return; } //Write message content Console.WriteLine("Stock Query Message for: " + stockQueryMessage.StockCode); //Get stock counts from a database... int reservedStockCount; int totalStockCount; switch (stockQueryMessage.StockCode) { case "S01": reservedStockCount = 14; totalStockCount = 80; break; case "S02": reservedStockCount = 0; totalStockCount = 25; break; default: //Stock does not exists! reservedStockCount = -1; totalStockCount = -1; break; } //Create a reply message for stock query var stockQueryResult = new StockQueryResultMessage { StockCode = stockQueryMessage.StockCode, ReservedStockCount = reservedStockCount, TotalStockCount = totalStockCount }; //Create a MDS response message to send to client var responseMessage = e.Message.CreateResponseMessage(); responseMessage.MessageData = GeneralHelper.SerializeObject(stockQueryResult); //Send message responseMessage.Send(); //Acknowledge the original request message. //So, it will be deleted from queue. e.Message.Acknowledge(); } } }
這個庫存服務監聽進來的StockQueryMessage消息對象,而後把StockQueryResultMessage消息對象發送給查詢者。爲了簡單起見,我沒有從數據庫查詢庫存。應答消息對象是由傳入消息對象的CreateResponseMessage()方法建立的。最後,發出迴應消息後要確認進入的消息。如今,我展現一個簡單的庫存客戶端從服務器查詢庫存的示例:
using System; using MDS; using MDS.Client; using MDS.Communication.Messages; using StockCommonLib; namespace StockApplication { class Program { static void Main(string[] args) { Console.WriteLine("Press enter to query a stock status"); Console.ReadLine(); //Connect to DotNetMQ var mdsClient = new MDSClient("StockClient"); mdsClient.MessageReceived += mdsClient_MessageReceived; mdsClient.Connect(); //Create a stock request message var stockQueryMessage = new StockQueryMessage { StockCode = "S01" }; //Create a MDS message var requestMessage = mdsClient.CreateMessage(); requestMessage.DestinationApplicationName = "StockServer"; requestMessage.TransmitRule = MessageTransmitRules.NonPersistent; requestMessage.MessageData = GeneralHelper.SerializeObject(stockQueryMessage); //Send message and get response var responseMessage = requestMessage.SendAndGetResponse(); //Get stock query result message from response message var stockResult = (StockQueryResultMessage) GeneralHelper.DeserializeObject(responseMessage.MessageData); //Write stock query result Console.WriteLine("StockCode = " + stockResult.StockCode); Console.WriteLine("ReservedStockCount = " + stockResult.ReservedStockCount); Console.WriteLine("TotalStockCount = " + stockResult.TotalStockCount); //Acknowledge received message responseMessage.Acknowledge(); Console.ReadLine(); //Disconnect from DotNetMQ server. mdsClient.Disconnect(); } static void mdsClient_MessageReceived(object sender, MessageReceivedEventArgs e) { //Simply acknowledge other received messages e.Message.Acknowledge(); } } }
在上面的示例中,爲了演示目的TransmitRule設置成了NonPersistent(非持久)。固然,你能夠發送StoreAndForward(持久性)消息。這個是程序運行的截圖:
圖 - 10:請求/應答式的通訊應用。
面向服務架構的DotNetMQ
SOA(面向服務的架構)是以個流行多年的概念了。Web服務和WCF是兩個主要的SOA解決方案。通常狀況下,一個消息隊列系統是不會預期支持SOA的。同時,消息通訊是異步的,鬆耦合的過程,而Web服務方法調用則一般是同步的,緊耦合的。即便(正如你在前面示例程序中看到的那樣)消息通訊並不如調用一個遠程方法同樣簡單,可是當你的消息數增長,你的應用變複雜以致於難以維護時就不同了。DotNetMQ支持持久性和非持久性的遠程調用機制,全部你能夠異步地調用一個遠程方法,DotNetMQ會確保調用成功。
簡單應用程序:短息/郵件發送器
在這裏咱們將開發一個簡單的服務,可用於發送短信和郵件。也許沒有必要專門寫一個服務來發送短信和郵件,這些功能均可以在應用自身實現,可是想象一下你有不少應用都要發郵件,在發送時若是郵件服務出問題了怎麼辦?在能夠成功發送郵件以前,應用程序必須一直嘗試。因此你必須在你的應用程序中創建一個郵件隊列機制,用於一次又一次的嘗試發送。在最壞的狀況下,你的應用程序可能只運行很短的時間(如Web服務)或者必須在發送完郵件前關閉。可是在郵件服務器上線後,你還必須發送,不容許郵件丟失。
在這種狀況下,你能夠開發一個單獨的郵件/短信服務,它將嘗試發送直到成功。你能夠經過DotNetMQ開發一個郵件服務,僅當郵件發送成功時確認請求,若是發送失敗,只要不確認(或拒絕)消息就好了,它稍後會重試。
服務端
首先,咱們開發短信/郵件的服務部分。爲了實現這個,咱們必須定義一個派生自MDSService的類型:
using System; using MDS.Client.MDSServices; namespace SmsMailServer { [MDSService(Description = "This service is a " + "sample mail/sms service.", Version = "1.0.0.0")] public class MyMailSmsService : MDSService { //All parameters and return values can be defined. [MDSServiceMethod(Description = "This method is used send an SMS.")] public void SendSms( [MDSServiceMethodParameter("Phone number to send SMS.")] string phone, [MDSServiceMethodParameter("SMS text to be sent.")] string smsText) { //Process SMS Console.WriteLine("Sending SMS to phone: " + phone); Console.WriteLine("Sms Text: " + smsText); //Acknowledge the message IncomingMessage.Acknowledge(); } //You do not have to define any parameters [MDSServiceMethod] public void SendEmail(string emailAddress, string header, string body) { //Process email Console.WriteLine("Sending an email to " + emailAddress); Console.WriteLine("Header: " + header); Console.WriteLine("Body : " + body); //Acknowledge the message IncomingMessage.Acknowledge(); } // A simple method just to show return values. [MDSServiceMethod] [return: MDSServiceMethodParameter("True, if phone number is valid.")] public bool IsValidPhone([MDSServiceMethodParameter( "Phone number to send SMS.")] string phone) { //Acknowledge the message IncomingMessage.Acknowledge(); //Return result return (phone.Length == 10); } } }
如你所見,它只是一個帶有特性(Attribute)的一個常規C#類。MDSService和MDSServiceMethod兩個特性是必須的,其餘的特性是可選的(不過寫上去是最好了,你將很快會看到什麼會用這些特性)。你提供服務的方法必須有MDSServiceMehod特性,若是你不想公開一些方法,只要不加MDSServiceMethod特性就好了。
你還必須在你的服務方法中確認消息,不然,這個消息(引發這個服務方法調用的那個)就不會從消息隊列中刪除,而咱們的服務方法將會被再次調用。若是咱們不能處理這個消息(好比,若是郵件服務沒有工做,咱們沒辦法發送時)咱們也能夠拒絕它。若是咱們拒絕了這個消息,它稍後還會發送給咱們(很可靠)。你能夠經過MDSService類的IncomingMessage屬性獲得原消息,另外,你也能夠經過RemoteApplication屬性獲得遠程應用程序的信息。
建立了正確的服務類後,咱們必須建立一個應用來運行它,下面是用一個簡單的控制檯程序運行咱們的MyMailSmsService服務:
using System; using MDS.Client.MDSServices; namespace SmsMailServer { class Program { static void Main(string[] args) { using (var service = new MDSServiceApplication("MyMailSmsService")) { service.AddService(new MyMailSmsService()); service.Connect(); Console.WriteLine("Press any key to stop service"); Console.ReadLine(); } } } }
如你所見,只須要3行代碼就能夠建立並運行服務,因爲MDSService是可銷燬的,因此你能夠uing語句,另外,你也可使用MDSServiceApplication的Disconnect方法手動關閉服務。你能夠經過AddService方法在一個MDSServiceApplication中運行多個服務。
客戶端
爲了開發一個使用DotNetMQ服務的應用,你必須建立一個服務代理(就像Web服務和WCF那樣)。爲了建立代理,你能夠用MDSServiceProxyGenerator工具。首先,編譯你的服務項目,而後運行MDSServiceProxyGenerator.exe(在DotNetMQ安裝目錄).
圖 - 11:爲DotNetMQ服務生成代理類。
選擇你的服務程序集(在這個簡單的例子中是指SmsMailServer.exe)。你能夠選擇服務類或生成這個程序集裏全部服務的代理。輸入一個命名空間和一個目標文件夾,而後生成代理類。生成玩後,你就能夠把它加到你的項目裏了。
我就不展現這個代理類了,但你必須瞭解它(你能夠看源碼,它是一個很簡單的類)。你方法/參數上的特性用來生成這個代理類的註釋。
在咱們的項目裏添加這個代理類後,咱們就能夠想簡單方法調用那樣向服務發消息了。
using System; using MDS.Client; using MDS.Client.MDSServices; using SampleService; namespace SmsMailClient { class Program { static void Main(string[] args) { Console.WriteLine("Press enter to test SendSms method"); Console.ReadLine(); //Application3 is name of an application that sends sms/email. using (var serviceConsumer = new MDSServiceConsumer("Application3")) { //Connect to DotNetMQ server serviceConsumer.Connect(); //Create service proxy to call remote methods var service = new MyMailSmsServiceProxy(serviceConsumer, new MDSRemoteAppEndPoint("MyMailSmsService")); //Call SendSms method service.SendSms("3221234567", "Hello service!"); } } } }
你也能夠調用服務的其餘方法,會獲得像常規方法那樣的返回值。實際上,你的方法調用被轉換成了可靠的消息,好比,即便你的遠程應用程序(MyMailSmsService)在方法調用時沒有運行,在服務啓動後也會被調用,因此你的方法調用是必定會被調用的。
你能夠經過改變服務代理的TransmitRule屬性來改變消息傳輸的規則。若是服務方法返回void,那麼他的默認傳輸規則是StoreAndForward。若是服務方法有個一返回值,那麼方法調用將會不可靠(由於方法調用時同步的,要等待一個結果的),它的規則是DiretlySend。你能夠選擇任何類型做爲方法的參數,若是參數類型是基元類型(string,int,byte...),就不須要附加的設置,可是若是你想用你自定義的類型做爲方法參數,這個類型必須標記爲Serializable,由於DotNetMQ會用二進制序列化參數。
注意:你在運行這個例子前必須在DotNetMQ裏註冊MyMailSmsService和Application3。
Web服務支持
固然,你能夠在Web服務裏鏈接DotNetMQ,由於把自己仍是一個.Net應用程序。可是,爲何你要寫一個ASP.NET Web方法爲應用程序處理消息(並且能夠在同一個上下文中回覆消息)呢?Web服務更適合這樣請求/應答式的方法調用。
DotNetMQ支持ASP.NET web服務並能夠傳遞消息到web服務。這裏有個web服務的模板樣品(在下載文件中)來實現這一目標。它的定義以下:
using System; using System.Web.Services; using MDS.Client.WebServices; [WebService(Namespace = "http://www.dotnetmq.com/mds")] [WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)] public class MDSAppService : WebService { /// <summary> /// MDS server sends messages to this method. /// </summary> /// <param name="bytesOfMessage">Byte array form of message</param> /// <returns>Response message to incoming message</returns> [WebMethod(Description = "Receives incoming messages to this web service.")] public byte[] ReceiveMDSMessage(byte[] bytesOfMessage) { var message = WebServiceHelper.DeserializeMessage(bytesOfMessage); try { var response = ProcessMDSMessage(message); return WebServiceHelper.SerializeMessage(response); } catch (Exception ex) { var response = message.CreateResponseMessage(); response.Result.Success = false; response.Result.ResultText = "Error in ProcessMDSMessage method: " + ex.Message; return WebServiceHelper.SerializeMessage(response); } } /// <summary> /// Processes incoming messages to this web service. /// </summary> /// <param name="message">Message to process</param> /// <returns>Response Message</returns> private IWebServiceResponseMessage ProcessMDSMessage(IWebServiceIncomingMessage message) { //Process message //Send response/result var response = message.CreateResponseMessage(); response.Result.Success = true; return response; } }
如上所述,你不須要改變ReceiveMDSMessage方法,並且必須在ProcessMDSMessage方法裏處理消息。另外,你須要向下面這樣在MDSSettings.xml裏定義你的web服務地址,你也能夠用DotNetMQ管理工具添加web服務。
... <Applications> <Application Name="SampleWebServiceApp"> <Communication Type="WebService" Url="http://localhost/SampleWebApplication/SampleService.asmx" /> </Application> </Applications> ...
DotNetMQ的性能
這是一些經過DotNetMQ傳送消息的測試結果:
消息傳送:
方法調用(在DotNetMQ服務裏)
測試平臺:Intel Core 2 Duo 3,00 GHZ CPU.2 GB RAM PC。消息傳送和方法調用是在同一臺電腦上的兩個應用程序之間進行的。
引用
書籍:Enterprise Integration Patterns: Designing,Building,and Deploying Messaging Solutions .做者 Gregor Hohpe,Bobby Woolf(艾迪生韋斯利出版,2003年)。
歷史