Msmq設計文檔html
文件狀態:編程 [√] 草稿安全 [ ] 正式發佈服務器 [ ] 正在修改網絡 |
文件標識:多線程 |
ECI-MSMQ v01架構 |
當前版本:異步 |
0.5async |
|
做 者:分佈式 |
阿新 |
|
完成日期: |
2005-8-18 |
1.0文檔說明:
1.1文檔目的
介紹了MSMQ的基本編程(如存儲和接收消息)和基本的管理功能(如建立和刪除隊列)。雖然使用.Net API來是很是方便和簡單的,可是在實際的MSMQ項目中,須要瞭解消息隊列做爲架構的概念。經過使用MSMQ,系統會更加鬆散耦合,所以更加自治(autonomous)。須要注意的是:消息僅僅是消息,而不是內部的業務對象。所以,在設計新的分佈式應用程序時,建議遵照面向服務架構(Service-Oriented Architecture)的基本思想:經過顯式定義邊界、建立自治服務,讓MSMQ來負責交互部分。
1.2文檔範圍
涉及居於MSMQ基礎的傳輸信息的交互開發
1.3讀者對象
系統設計人員,開發人員,測試人員
1.4參考文獻
Msdn Cnblogs
1.5專業術語
1、「消息」是在兩臺計算機間傳送的數據單位。消息能夠很是簡單,例如只包含文本字符串;也能夠更復雜,可能包含嵌入對象。消息被髮送到隊列中。「消息隊列(MSMQ)」是在消息的傳輸過程當中保存消息的容器。消息隊列管理器在將消息從它的源中繼到它的目標時充當中間人。隊列的主要目的是提供路由並保證消息的傳遞;若是發送消息時接收者不可用,消息隊列會保留消息,直到能夠成功地傳遞它。
2、隊列類型(Queue Type)
有兩種主要的隊列類型:由您或網絡中的其餘用戶建立的隊列和系統隊列。用戶建立的隊列多是如下任何一種隊列:
「公共隊列」在整個「消息隊列」網絡中複製,而且有可能由網絡鏈接的全部站點訪問。
「專用隊列」不在整個網絡中發佈。相反,它們僅在所駐留的本地計算機上可用。專用隊列只能由知道隊列的完整路徑名或標籤的應用程序訪問。
「管理隊列」包含確認在給定「消息隊列」網絡中發送的消息回執的消息。指定但願 MessageQueue 組件使用的管理隊列(若是有的話)。
「響應隊列」包含目標應用程序接收到消息時返回給發送應用程序的響應消息。指定但願 MessageQueue 組件使用的響應隊列(若是有的話)。
說明:咱們這裏用到專用隊列;
3、同步和異步通訊(Synchronous VS. Asynchronous Communication)
隊列通訊天生就是異步的,由於將消息發送到隊列和從隊列中接收消息是在不一樣的進程中完成的。另外,能夠異步執行接收操做,由於要接收消息的人能夠對任何給定的隊列調用 BeginReceive 方法,而後當即繼續其餘任務而不用等待答覆。這與人們所瞭解的「同步通訊」大相徑庭。 在同步通訊中,請求的發送方在執行其餘任務前,必須等待來自預約接收方的響應。發送方等待的時間徹底取決於接收方處理請求和發送響應所用的時間
4、同消息隊列交互(Interacting with Message Queues)
消息處理和消息爲基於服務器的應用程序組件之間的進程間通訊提供了強大靈活的機制。同組件間的直接調用相比,它們具備若干優勢,其中包括:
穩定性 — 組件失敗對消息的影響程度遠遠小於組件間的直接調用,由於消息存儲在隊列中並一直留在那裏,直到被適當地處理。消息處理同事務處理類似,由於消息處理是有保證的。
消息優先級 — 更緊急或更重要的消息可在相對不重要的消息以前接收,所以能夠爲關鍵的應用程序保證足夠的響應時間。
脫機能力 — 發送消息時,它們可被髮送到臨時隊列中並一直留在那裏,直到被成功地傳遞。當因任何緣由對所需隊列的訪問不可用時,用戶能夠繼續執行操做。同時,其餘操做能夠繼續進行,如同消息已經獲得了處理同樣,這是由於網絡鏈接恢復時消息傳遞是有保證的。
事務性消息處理 — 將多個相關消息耦合爲單個事務,確保消息按順序傳遞、只傳遞一次而且能夠從它們的目標隊列中被成功地檢索。若是出現任何錯誤,將取消整個事務。
安全性 — MessageQueue 組件基於的消息隊列技術使用 Windows 安全來保護訪問控制,提供審覈,並對組件發送和接收的消息進行加密和驗證
2類庫功能說明
完成將報文以string,stream,xmldocument,dataset的形式發送到指定的消息隊列,並接收這些消息還原成報文格式;支持多線程異步方式接收消息;
2.1類庫結構
類名稱 |
方法/類型 |
參數 |
說明 |
ECI.MSMQLib |
|
|
|
FormatterType |
枚舉類型 |
Xml,binary,stream |
定義消息序列化類型 |
MQProfile |
類 |
Path,transactional… |
定義消息隊列的基本參數 |
MsgType |
枚舉類型 |
String,stream,dataset,docment |
定義消息的類型 |
MyConvert |
ToXmlDoc |
dataset |
Dataset轉化成xmldocuemnt |
|
ToDataSet |
xmldocuemnt |
xmldocuemnt轉化成Dataset |
|
ToStream |
Dataset,xmldocument |
把DataSet,xmldocuemnt轉化成stream |
|
ToString |
Stream |
將stream轉成String |
|
ToString |
XmlDocument |
將xmldocment轉成String |
|
ToString |
dataset |
將xmldocment轉成String |
|
ToBig |
Big5 |
繁體轉簡體 |
|
ToBig5 |
Big |
簡體轉繁體 |
MySteam |
Read |
|
讀取stream中的消息 |
WriteLog |
Save |
|
當msmq發送錯誤將記錄錯誤信息同時備份下消息內容,發送Mail通知處理人員 |
MQReceiveDelegate |
委託 |
|
在採用異步接受時會用到 |
Sender |
Sender |
MQProfile |
構造函數,初始化消息隊列 |
|
Send |
|
根據MQProflie的定義發送消息 |
|
|
Lable,Context |
發送String;定義消息標籤和內容 |
|
|
DataSet,Lable |
發送DataSet |
|
|
Xmldocuemnt,Lable |
發送XmlDocuemnt |
|
|
Stream,Lable |
發送Stream |
Receiver |
Receiver |
MQProfile |
構造函數,初始化消息隊列 |
|
Receive |
|
同步接收String |
|
ReceiveXmlDoc |
|
同步接收xmlDcoument |
|
ReceiveDataset |
|
同步接收DataSet |
|
AsynReceive |
|
異步接收消息經過MQReceiveDelegate Receiving獲取消息體和標籤 |
|
AsynReceiveCallBak |
|
異步回調接收消息經過MQReceiveDelegate Receiving獲取消息和標籤 |
2.2消息隊列的建立
///MQPath = FormatName:DIRECT=TCP:172.20.30.36\Private$\PathName 經過ip方式調用隊列
///MQPath = FormatName:Direct=http://localhost/msmq/Private$/PathName 經過http方式調用隊列優勢能夠穿越防火牆的限制;
///MQPath = FormatName:Direct=OS:JACK-FC0647E1EE\\private$\\demo1 本測試在域服務器上,測試經過,網上說此格式只能用於在域中的計算機;
///MQPath = MachineName\Private$\PathName 經過主機名方式調用隊列
///建立隊列
System.Messaging.MessageQueue mq = new System.Messaging.MessageQueue(this.MQPath);
///建立消息
System.Messaging.Message msg=new System.Messaging.Message();
//msg.Recoverable=true;
/*
Recoverable 屬性指示是否保證消息的傳遞,即便計算機在消息傳遞到目標隊列的途中崩潰。 若是保證消息的傳遞,則在途中的每一步都將本地存儲消息,直到消息被成功地轉發到下一臺計算機。將 Recoverable 屬性設置爲 true 可能會影響吞吐量。 若是消息是事務性的,消息隊列會自動將消息視爲可恢復的,而與 Recoverable 屬性的值無關。
*/
///定義消息的序列化類型
mq.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(string)}) ;
msg.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(string)}) ;
mq.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(DataSet)}) ;
msg.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(DataSet)}) ;
///建立一個事務用於對事務性隊列的控制
MessageQueueTransaction mqt=new MessageQueueTransaction();
///對消息標籤賦值 string
msg.Label=MQlable;
///對消息體賦值 object
msg.Body=Context;
///對以流形式的消息體賦值 stream
msg.BodyStream =stream
try
{
///開始一個發送消息事務
mqt.Begin();
///發送消
mq.Send(msg,mqt);
///成功發送
mqt.Commit();
}
catch(MessageQueueException e)
{
//Log.Save(e.Message);
///回滾整個事物
mqt.Abort();
///將錯誤信息寫入日至
//WriteLog.Save(this.MQPath,"Sender.SendStream(System.IO.Stream stream,string Lable).Transaction",e.Message,MyReadStream.Read(stream));
throw;
}
2.3 讀/顯示消息
當消息接受後,消息將從隊列中刪除。能夠經過使用MessageQueue.Peek方法來檢索消息隊列中的第一個消息的複製,保留消息在隊列中。不過,這樣只能獲取的相同的消息。更好的辦法是經過foreach來讀消息隊列中的消息,但不刪除隊列中的消息。
foreach(System.Messaging.Message message in queue)
{
txtResults.Text += message.Label + Environment.NewLine;
}
2.4 異步接受消息
public void AsynReceiveCallBak()
{
try
{
//this.MessageType=msgType;
MessageQueueTransaction myTransaction = new MessageQueueTransaction();
anycMQ = new MessageQueue(MQpath);
AsyncCallback cb = new AsyncCallback (callback);
IAsyncResult asyncResult=anycMQ.BeginReceive(new TimeSpan(0,0,5,0),DateTime.Now,cb);
//return MQ_OutBox;
}
catch(Exception e)
{
WriteLog.Save(this.MQPath,"Receiver.AsynReceiveCallBak()",e.Message);
throw;
}
}
private void callback(IAsyncResult handle)
{
Message msg=anycMQ.EndReceive(handle);
try
{
MQcontext=msg.BodyStream;
MQlable=msg.Label;
this.Receiving(this.MQLable,this.MQContext);
this.AsynReceiveCallBak();
}catch
{
……..
}
}
源代碼:
http://files.cnblogs.com/neozhu/MSMQLib.rar
出處:http://www.cnblogs.com/neozhu/archive/2005/09/08/232634.html