原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
做者:damienbod
譯文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:http://www.javashuo.com/article/p-cvwwbmzw-hu.html
做者:Lamond Lu
源代碼: https://github.com/lamondlu/AzureServiceBusMessaginghtml
本文展現瞭如何使用Azure Service Bus Queue, 實現2個ASP.NET Core Api應用之間的消息傳輸。git
你能夠從官網文檔中瞭解到如何配置一個Azure Service Bus Queue.github
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portalc#
這裏咱們使用Queue或者Topic來實現消息傳輸。Queue是一種消息傳輸類型,一旦一個消息被一個消費者接收了,該消息就會從Queue中被移除。api
與Queue不一樣,Topic提供的是一對多的通信方式。架構
整個應用的實現以下:async
這裏咱們首先須要引入** Microsoft.Azure.ServiceBus ** 程序集。Microsoft.Azure.ServiceBus是Azure Service Bus的客戶端庫。針對Service Bus的鏈接字符串咱們保存在項目的User Secret中。當部署項目的時候,咱們可使用Azure Key Valut來設置這個Secret值。函數
在Visual Studio中,右鍵點擊API1, API2項目屬性,選擇Manage User Secrets就能夠管理當前項目使用的全部私密信息。測試
爲了發送向Azure Service Bus Queue發送消息,咱們須要建立一個SendMessage
方法,並接收一個消息參數。這裏咱們建立了一個咱們本身的消息內容類型MyPayload
, 將當前該MyPayload
對象序列化成Json字符串, 添加到一個Message
對象中。ui
using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using System.Text; using System.Threading.Tasks; namespace ServiceBusMessaging { public class ServiceBusSender { private readonly QueueClient _queueClient; private readonly IConfiguration _configuration; private const string QUEUE_NAME = "simplequeue"; public ServiceBusSender(IConfiguration configuration) { _configuration = configuration; _queueClient = new QueueClient( _configuration .GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); } public async Task SendMessage(MyPayload payload) { string data = JsonConvert.SerializeObject(payload); Message message = new Message(Encoding.UTF8.GetBytes(data)); await _queueClient.SendAsync(message); } } }
在API 1和API 2中,咱們須要將ServiceBusSender
註冊到應用程序的IOC容器中。這裏爲了測試方便,咱們同時註冊Swagger
服務。
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); services.AddScoped<ServiceBusSender>(); services.AddSwaggerGen(c => { c.SwaggerDoc("v1", new Info { Version = "v1", Title = "Payload View API", }); }); }
接下來,咱們就能夠在控制器中經過構造函數注入的方式使用這個服務了。
在API1中,咱們建立一個POST方法,這個方法會將API接收到Payload
對象發送到Azure Service Bus Queue中。
[HttpPost] [ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)] [ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)] public async Task<IActionResult> Create([FromBody][Required]Payload request) { if (data.Any(d => d.Id == request.Id)) { return Conflict($"data with id {request.Id} already exists"); } data.Add(request); // Send this to the bus for the other services await _serviceBusSender.SendMessage(new MyPayload { Goals = request.Goals, Name = request.Name, Delete = false }); return Ok(request); }
爲了監聽Azure Service Bus Queue, 並處理接收到的消息,咱們建立了一個新類ServiceBusConsumer
,ServiceBusConsumer
實現了IServiceBusConsumer
接口。
Queue的鏈接字符串是使用IConfiguration
讀取的。 RegisterOnMessageHandlerAndReceiveMessages
方法負責註冊消息處理程序ProcessMessagesAsync
處理消息。ProcessMessagesAsync
方法會將獲得的消息轉換成對象,並調用IProcessData
接口完成最終的消息處理。
using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ServiceBusMessaging { public interface IServiceBusConsumer { void RegisterOnMessageHandlerAndReceiveMessages(); Task CloseQueueAsync(); } public class ServiceBusConsumer : IServiceBusConsumer { private readonly IProcessData _processData; private readonly IConfiguration _configuration; private readonly QueueClient _queueClient; private const string QUEUE_NAME = "simplequeue"; private readonly ILogger _logger; public ServiceBusConsumer(IProcessData processData, IConfiguration configuration, ILogger<ServiceBusConsumer> logger) { _processData = processData; _configuration = configuration; _logger = logger; _queueClient = new QueueClient( _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); } public void RegisterOnMessageHandlerAndReceiveMessages() { var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 1, AutoComplete = false }; _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); } private async Task ProcessMessagesAsync(Message message, CancellationToken token) { var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body)); _processData.Process(myPayload); await _queueClient.CompleteAsync(message.SystemProperties.LockToken); } private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception"); var context = exceptionReceivedEventArgs.ExceptionReceivedContext; _logger.LogDebug($"- Endpoint: {context.Endpoint}"); _logger.LogDebug($"- Entity Path: {context.EntityPath}"); _logger.LogDebug($"- Executing Action: {context.Action}"); return Task.CompletedTask; } public async Task CloseQueueAsync() { await _queueClient.CloseAsync(); } } }
其中IProcessData
接口存在於類庫項目ServiceBusMessaging
中,它是用來處理消息的。
public interface IProcessData { void Process(MyPayload myPayload); }
在Api 2中,咱們建立一個ProcessData
類,它實現了IProcessData
接口。
public class ProcessData : IProcessData { public void Process(MyPayload myPayload) { DataServiceSimi.Data.Add(new Payload { Name = myPayload.Name, Goals = myPayload.Goals }); } }
這裏爲了簡單測試,咱們建立了一個靜態類
DataServiceSimi
,其中存放了API2中全部保存Payload
對象。同時,咱們還建立了一個新的控制器ViewPayloadMessagesController
,在其中添加了一個GET Action,並返回了靜態類DataServiceSimi
中的全部數據。
[Route("api/[controller]")] [ApiController] public class ViewPayloadMessagesController : ControllerBase { [HttpGet] [ProducesResponseType(StatusCodes.Status200OK)] public ActionResult<List<Payload>> Get() { return Ok(DataServiceSimi.Data); } }
最後咱們還須要將ProcessData
註冊到API2的IOC容器中。
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>(); services.AddTransient<IProcessData, ProcessData>(); }
如今咱們分別啓用2個Api項目,並在Api 1的Swagger文檔界面,調用POST請求,添加一個Payload
操做完成以後,咱們訪問Api 2的/api/ViewPayloadMessages, 得到結果以下,Api 1發出的消息出如今了Api 2的結果集中,這說明Api 2從Azure Service Bus Queue中獲取了消息,並保存在了本身的靜態類DataServiceSimi
中。