Azure Storage 系列(六)使用Azure Queue Storage

一,引言

  在以前介紹到 Azure Storage 第一篇文章中就有介紹到 Azure Storage 是 Azure 上提供的一項存儲服務,Azure 存儲包括 對象、文件、磁盤、隊列和表存儲。這裏的提到的隊列(Queue)就是今天要分享的內容。html

慣例,先來一些微軟的官方解釋git

  1,什麼是 Azure Queue Storage?github

  答:Azure 隊列存儲是一項實現基於雲的隊列的 Azure 服務。 每一個隊列都保留一個消息列表。 應用程序組件使用 REST API 或 Azure 提供的客戶端庫訪問隊列。 一般狀況下,將有一個或多個「發送方」組件以及一個或多個「接收方」組件。 發送方組件將消息添加到隊列。 接收方組件檢索隊列前面的消息以進行處理。 下圖顯示多個將消息添加到 Azure 隊列的發送者應用程序以及一個檢索消息的收件方應用程序。windows

  隊列中的消息是最大爲 64 KB 的字節數組。 任何 Azure 組件都不會解釋消息內容。若是要建立結構化消息,可使用 XML 或 JSON 格式化消息內容。 代碼負責生成並解釋自定義格式。數組

--------------------我是分割線--------------------bash

Azure Blob Storage 存儲系列:async

1,Azure Storage 系列(一)入門簡介ide

2,Azure Storage 系列(二) .NET Core Web 項目中操做 Blob 存儲post

3,Azure Storage 系列(三)Blob 參數設置說明測試

4,Azure Storage 系列(四)在.Net 上使用Table Storage

5,Azure Storage 系列(五)經過Azure.Cosmos.Table 類庫在.Net 上使用 Table Storage  

6,Azure Storage 系列(六)使用Azure Queue Storage

二,正文

1,Azure Portal 上建立 Queue 

選擇 cnbateblogaccount 左側菜單的 「Queue service=》Queues」 ,點擊 「+ Queue」

Queue name:「blogmessage」

點擊 「OK」

2,添加對 Storage Queue 的相應方法

2.1,安裝 「Azure.Storage.Queues」 的Nuget

使用程序包管理控制檯進行安裝

Install-Package Azure.Storage.Queues -Version 12.4.2

2.2,建立IQueueService 接口,和 QueueService 實現類,Queue控制器方法等

 1 public interface IQueueService
 2     {
 3         /// <summary>
 4         /// 插入Message
 5         /// </summary>
 6         /// <param name="msg">msg</param>
 7         /// <returns></returns>
 8         Task AddMessage(string msg);
 9 
10         /// <summary>
11         /// 獲取消息
12         /// </summary>
13         /// <returns></returns>
14         IAsyncEnumerable<string> GetMessages();
15 
16         /// <summary>
17         /// 更新消息
18         /// </summary>
19         /// <returns></returns>
20         Task UpdateMessage();
21 
22         /// <summary>
23         /// 處理消息
24         /// </summary>
25         /// <returns></returns>
26         Task ProcessingMessage();
27 
28 
29     }
IQueueService.cs
 1 public class QueueService : IQueueService
 2     {
 3         private readonly QueueClient _queueClient;
 4 
 5         public QueueService(QueueClient queueClient)
 6         {
 7             _queueClient = queueClient;
 8         }
 9 
10 
11 
12         /// <summary>
13         /// 添加消息
14         /// </summary>
15         /// <param name="msg">消息</param>
16         /// <returns></returns>
17         public async Task AddMessage(string msg)
18         {
19             // Create the queue
20             _queueClient.CreateIfNotExists();
21 
22             if (_queueClient.Exists())
23             {
24  
25                 // Send a message to the queue
26                  await _queueClient.SendMessageAsync(msg.EncryptBase64());
27             }
28         }
29 
30         public async IAsyncEnumerable<string> GetMessages()
31         {
32             if (_queueClient.Exists())
33             {
34                 // Peek at the next message
35                 PeekedMessage[] peekedMessage = await _queueClient.PeekMessagesAsync();
36                 for (int i = 0; i < peekedMessage.Length; i++)
37                 {
38                     //Display the message
39                     yield return string.Format($"Peeked message: '{peekedMessage[i].MessageText.DecodeBase64()}'") ;
40                 }
41             }
42         }
43 
44         /// <summary>
45         /// 處理消息
46         /// </summary>
47         /// <returns></returns>
48         public async Task ProcessingMessage()
49         {
50             // 執行 getmessage(), 隊頭的消息會變得不可見。
51             QueueMessage[] retrievedMessage = await _queueClient.ReceiveMessagesAsync();
52             try
53             {
54                 //處理消息
55 
56 
57                 // 若是在30s內你沒有刪除這條消息,它會從新出如今隊尾。
58                 // 因此正確處理一條消息的過程是,處理完成後,刪除這條消息
59                 await _queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
60             }
61             catch //(消息處理異常)
62             { }
63         }
64 
65         /// <summary>
66         /// 更新已排隊的消息
67         /// </summary>
68         /// <returns></returns>
69         public async Task UpdateMessage()
70         {
71             if (_queueClient.Exists())
72             {
73                 // Get the message from the queue
74                 QueueMessage[] message = await _queueClient.ReceiveMessagesAsync();
75 
76                 // Update the message contents
77                 await _queueClient.UpdateMessageAsync(message[0].MessageId,
78                         message[0].PopReceipt,
79                         "Updated contents".EncryptBase64(),
80                         TimeSpan.FromSeconds(60.0)  // Make it invisible for another 60 seconds
81                     );
82             }
83         }
84     }
QueueService.cs
 1 [Route("Queue")]
 2     public class QueueExplorerController : Controller
 3     {
 4 
 5         private readonly IQueueService _queueService;
 6 
 7         public QueueExplorerController(IQueueService queueSerivce)
 8         {
 9             this._queueService = queueSerivce;
10         }
11 
12         [HttpPost("AddQueue")]
13         public async Task<ActionResult> AddQueue()
14         {
15             string msg = $"我是添加進去的第一個消息";
16             await _queueService.AddMessage(msg);
17             return Ok();
18         }
19 
20         [HttpGet("QueryQueue")]
21         public  ActionResult QueryQueue()
22         {
23             return Ok( _queueService.GetMessages());
24             
25         }
26 
27         [HttpPut("UpdateQueue")]
28         public async Task<ActionResult> UpdateQueue()
29         {
30             await _queueService.UpdateMessage();
31             return Ok();
32         }
33 
34         [HttpGet("ProcessingMessage")]
35         public async Task<ActionResult> ProcessingQueue()
36         {
37             await _queueService.ProcessingMessage();
38             return Ok();
39         }
40     }
QueueExplorerController.cs

重點:將新消息添加到隊列的後面。可見性超時指定消息應該對Dequeue和Peek操做不可見的時間。消息內容必須是UTF-8編碼的字符串,最大長度爲64KB。

消息的格式必須能夠包含在具備UTF-8編碼。要在消息中包含標記,消息的內容必須爲XML換碼或Base64編碼。在將消息添加到隊列以前,將刪除消息中全部未轉義或編碼的XML標記。

咱們這裏使用Base64編碼

public static class StringExtensions
    {
        public static string EncryptBase64(this string s)
        {
            byte[] b = Encoding.UTF8.GetBytes(s);
            return Convert.ToBase64String(b);
        }

        public static string DecodeBase64(this string s)
        {
            byte[] b = Convert.FromBase64String(s);
            return Encoding.UTF8.GetString(b);
        }
    }

2.3,添加對 QueueService,以及QueueClient 的以來注入

services.AddSingleton(x => new QueueClient("DefaultEndpointsProtocol=https;AccountName=cnbateblogaccount;AccountKey=e2T2gYREFdxkYIJocvC4Wut7khxMWJCbQBp8tPM2EJt37QaUUlflTPAlkoJzIlY29aGYt8WW0xx1bckO4hLKJA==;EndpointSuffix=core.windows.net", "blogmessage"));
services.AddSingleton<IQueueService, QueueService>();

3,Postman 對相應接口進行測試

3.1,添加隊列消息

咱們添加一條 「我是添加進去的第一個消息」 的Queue

 postman 中輸入 「localhost:9001/Queue/AddQueue」,點擊 「Send」

接下來,咱們能夠在 VS 的 「Cloud Explorer」 查看到對應的 「cnbateblogaccount」 的 Strorage Account,以及 「blogmessage」 的 Storage Queue

右鍵彈出選擇頁面,點擊 「打開」

 咱們能夠看懂添加進去的 Queue 的信息 ,Queue 的過去時間由於咱們進行設置,這裏默認是7天

3.2 查詢Queue

postman 中輸入 「localhost:9001/Queue/QueryQueue」,點擊 「Send」,能夠看到剛剛添加進去的Queue被查詢出來了

3.3,更新Queue

postman 中輸入 「localhost:9001/Queue/UpdateQueue」,點擊 「Send」

注意:因爲咱們在更新 Queue 的時候,設置了 Queue 的不可見時間爲60秒,因此在更新操做完成後去查看 Queue 會找不到更新的那條Queue 的信息,稍等一下再去查看  就能夠展現出更新的 Queue 的信息

 更新的Queue的文本內容已經發生改變了

3.4 處理Queue

postman中輸入 「localhost:9001/Queue/ProcessingMessage」,點擊 「Send」

注意:由於這裏只是作演示,因此就假象進行消息處理,處理完成後,刪除這條消息。

能夠看到已經沒有了 Queue 的信息了

Ok,今天的分享就先到此結束

三,結尾

github:https://github.com/yunqian44/Azure.Storage.git

做者:Allen 

版權:轉載請在文章明顯位置註明做者及出處。如發現錯誤,歡迎批評指正。

做者:Allen 版權:轉載請在文章明顯位置註明做者及出處。如發現錯誤,歡迎批評指正。
相關文章
相關標籤/搜索