【Microsoft Azure學習之旅】消息服務Service Bus的學習筆記及Demo示例

  今年項目組作的是Cloud產品,有幸接觸到了雲計算的知識,也瞭解並使用了當今流行的雲計算平臺Amazon AWS與Microsoft Azure。咱們的產品最初只部署在AWS平臺上,如今產品決定同時支持Azure,因此有幸學習下Azure,並在查看文檔資料以及寫Demo過程當中發現了其中的一些不一樣。雖然AWS與Azure是兩款旗鼓至關的競爭產品,可是仍是有不少區別。html

  本文主要是本身學習Service Bus中的學習筆記,本身有些結論也都跟微軟技術支持確認過。我的觀點,拋磚引玉:-)java

  消息服務對於雲計算產品是很重要的。例如SQS,咱們可使用持久化隊列來儲存消息,這樣同一產品的不一樣Module在通訊過程當中利用隊列存儲消息,可避免消息的丟失(由於消息會被分佈式冗餘的存儲,屬於隊列的內部實現)。針對隊列服務,AWS提供了SQS,Azure則提供了Service Bus(固然除了Service Bus,微軟還提供了Storage部分中的Queue功能,但我的認爲這個Queue偏重消息存儲,而Service Bus中的Queue偏重通訊,具體區別可參照此文章http://msdn.microsoft.com/library/azure/hh767287)。例外AWS裏面,SNS能夠和SQS一塊兒使用。SNS中就提供了對SQS的支持,若是有消息產生須要通知Module,SNS能夠直接將消息存儲在SQS中。編程

  而微軟的Azure提供的Service Bus功能,主要有五部分子功能組成:windows

  1. Queue 隊列 分佈式

    提供的功能與Amazon SQS相似函數

  2. Topic/Subscription 主題/訂閱post

    更高級的Queue,相似與一個虛擬隊列,可接收發送到主題的消息副本,從訂閱接收消息的方式與隊列接收消息的方式相同。性能

  3. Relay 中繼學習

    建立Hybrid Application時使用,我沒有使用過此功能。this

  4. Notification Hub 通知中心

    主要是向Device移動設備推送通知,例如註冊一個APP在通知中心,通知中心可向運行此APP的全部設備發送推進通知,支持幾乎全部的手機平臺。

  5. Event Hub 事件中心

  可用於存儲產品運行時產生或收集的大量事件。

  這裏咱們的產品主要使用的將是第二項功能,主題/訂閱。這樣咱們產品中的一個Module只需將消息發送至主題中,凡是訂閱了該主題的訂閱(隊列)都是有一個該消息的副本。這樣咱們的另外一個Module只需關注訂閱中的消息,取出消息,處理消息,達成通訊的目的。

  在Azure中,要使用主題/訂閱,首先要在Service Bus中新建一個Namespace,而後新建主題與訂閱便可。

  值得注意的是,以前Service Bus提供的認證方式都是ACS,前一段時間都改成了SAS(共享訪問簽名)。而這一改動須要SDK的支持,據我所知,Azure Java SDK上週纔剛剛在新版本中支持了這一認證,可謂效率之低。具體詳情可參見我上一篇文章:【Microsoft Azure學習之旅】Azure Java SDK - Service Bus的認證問題http://www.cnblogs.com/KevinSong/p/4146811.html。

  下面的代碼是我根據微軟文檔寫的一個Demo。使用Java SDK實現,做用是向主題發送消息,而後從訂閱中取出消息。

  1. 建立一個主題,並向主題發送消息

 1 //create a topic in name space
 2 public void Create(TopicInfo topic){
 3     System.out.println("=>CreateTopic");
 4     try{
 5         if(!IsAlreadyExist(topic)){
 6             CreateTopicResult result = service.createTopic(topic);
 7             System.out.println("Create Topic Result: " + result.toString());
 8         }
 9         else{
10             System.out.println("This Topic already exist, doesn't need to create");
11         }
12     }
13     catch (ServiceException e){
14         System.out.println("ServiceException encountered: " + e.getMessage());
15     }
16     System.out.println("<=CreateTopic");
17 }
18 
19 //send message to a topic
20 public void SendMessage(TopicInfo topic){
21     System.out.println("=>SendMessage");
22     try{
23         BrokeredMessage msg = new BrokeredMessage("testMsg");
24         
25         //set one property to this msg
26         msg.setProperty("testProperty", "kevin01");
27         
28         service.sendTopicMessage(topic.getPath(), msg);
29     }
30     catch (ServiceException e){
31         System.out.println("ServiceException encountered: " + e.getMessage());
32     }
33     System.out.println("<=SendMessage");
34 }

  2. 建立一個關注該主題的訂閱,並從訂閱中取出消息

 1 //create a subscription in name space
 2 public void Create(TopicInfo topic, SubscriptionInfo sub){
 3        System. out.println("=>CreateSubscription" );
 4        try{
 5           System. out.println("Topic Info: " + topic .getPath().toString());
 6          
 7           if(!IsAlreadyExist(topic , sub )){
 8              CreateSubscriptionResult result = service.createSubscription(topic .getPath(), sub);
 9               //and we can create a rule for this topic/subscription
10               //RuleInfo rule = new RuleInfo();
11              System. out.println("Create Subscription Result: " + result.toString());
12           }
13           else{
14              System. out.println("This subscription already exist. No need to create");
15           }
16       }
17        catch (ServiceException e ){
18           System. out.println("ServiceException encountered: " + e.getMessage());
19       }
20       System. out.println("<=CreateSubscription" );
21 }
22 
23 
24 //receive message from subscription
25 public void ReceiveMessage(TopicInfo topic, SubscriptionInfo sub){
26       System. out.println("=>ReceiveMessage" );
27       ReceiveMessageOptions opts = ReceiveMessageOptions. DEFAULT;
28        opts.setReceiveMode(ReceiveMode. PEEK_LOCK);
29        try{
30              ReceiveSubscriptionMessageResult result = service.receiveSubscriptionMessage(topic .getPath(), sub.getName(), opts);
31              
32              BrokeredMessage msg = result.getValue();
33              
34               if(msg != null && msg.getMessageId() != null){
35                     //print the message info
36                    System. out.println("Body: " + msg .toString());
37                    System. out.println("Message ID: " + msg.getMessageId());
38                    
39                    System. out.println("Custom Property Value: " + msg.getProperty("testProperty" ));
40                    
41                     //delete this message
42                     service.deleteMessage( msg);
43              }
44               else{
45                    System. out.println("There's no message in this subscription. Topic: " + topic.getPath() + ", Subscription: " + sub .getName());
46              }
47       }
48        catch (ServiceException e ){
49           System. out.println("ServiceException encountered: " + e.getMessage());
50       }
51       System. out.println("<=ReceiveMessage" );
52 }

  3. Main函數

 1 //test topic/subscription
 2 public static void main(String[] args){
 3     //create a topic, and send a message to this topic
 4     ServiceBusTopicHandler topicHandler = new ServiceBusTopicHandler();
 5     TopicInfo topic = new TopicInfo("testtopic");
 6     topicHandler.Create(topic);
 7     topicHandler.SendMessage(topic);
 8     
 9     //create a subscription about this topic, and receive message from this subscription
10     ServiceBusSubscriptionHandler subHandler = new ServiceBusSubscriptionHandler();
11     SubscriptionInfo sub = new SubscriptionInfo("testsubscription");
12     subHandler.Create(topic, sub);
13     
14     while(true){
15         subHandler.ReceiveMessage(topic, sub);
16     }
17 }

 

  得益於微軟提供的文檔與SDK,代碼實現很簡單。可是有個地方值得注意的是,

  如何從訂閱/隊列中取出消息?

  這涉及到Poll和Push技術的區別。

  最簡單的是咱們可使用Poll技術,就是例如在while(true)循環中一遍遍去查詢,若是有新消息就取出並處理,可是這樣會大大影響性能。在AWS中,咱們能夠利用SNS技術中的一項,註冊http endpoint到SNS,若是有新消息來了,能夠經過SNS調用咱們本身的Web API來通知咱們要去處理新消息,這樣就是Push的效果。可是在Azure中,有相似SNS的功能嗎?沒有。

  這樣SNS的技術相似於Push技術,若是有新消息,它會主動調用你的API通知你。而非Poll那樣低效率的一遍遍查詢。可是微軟Azure在去年提出了一項Long Polling的新技術,它的名字叫作Event-Driven Message Programing Model,事件驅動消息編程模型,該技術的發佈信息可參考http://msdn.microsoft.com/en-us/library/azure/dn198643.aspx。這其實本質是一項Long Polling長輪詢技術。

  在Azure .Net SDK中,你可使用OnMessage方法,來註冊一個回調Call back函數,若是有新消息來到,你的Call back會被調用到,來實現對消息的處理。本質是有不一樣的子線程,在執行long polling查詢。這跟Push技術仍是有很大的區別。

  但如今問題是,Event-Driven Message Programming Model僅僅在.Net SDK中提供,在其餘語言的Azure SDK版本中並不提供此項功能。曾經今年有人在MSDN問過這個問題,爲何不一樣的SDK區別對待,微軟的回覆請參照https://social.msdn.microsoft.com/forums/windows/en-us/7d0d4733-a696-4b72-927c-004caae029f7/event-driven-consumer-model-not-available-for-microsoft-azure-java-sdk?forum=windowsazuredevelopment。

  因而可知,微軟對於.NET SDK的重視,而對於相似Java的不重視啊,畢竟是自家的語言。.Net SDK已經目前發佈了數百個Release,而Java SDK還僅僅是v0.7.0版本。。。固然經過詢問微軟,Java SDK在未來有可能也支持此項功能(Event-Driven Message),但具體什麼時間會支持一切未可而知。

  

  不管是AWS,仍是Azure,都有完整詳細的文檔提供,這裏只是我在學習過程的一些總結。個人代碼示例(Java實現,採用Azure Java SDK)也附上以下。

  Service Bus Queue/Subject/Subscription Demo下載地址:http://files.cnblogs.com/KevinSong/testServiceBusQueue.zip

  有任何問題,歡迎討論。小白一個,繼續學習:-)

 

參照文檔:

1. Service Bus隊列,主題和訂閱

  http://msdn.microsoft.com/library/azure/hh367516.aspx

2. 介紹Event-Driven Message Programming Model

  http://fabriccontroller.net/blog/posts/introducing-the-event-driven-message-programming-model-for-the-windows-azure-service-bus/

3. AWS與Azure區別

  http://azure.microsoft.com/en-us/campaigns/azure-vs-aws/

 

 

Best Regards

Kevin Song

2014年12月19日

相關文章
相關標籤/搜索