如何在ASP.NET Core中使用Azure Service Bus Queue

原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
做者:damienbod
譯文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:http://www.javashuo.com/article/p-cvwwbmzw-hu.html
做者:Lamond Lu
源代碼: https://github.com/lamondlu/AzureServiceBusMessaginghtml

本文展現瞭如何使用Azure Service Bus Queue, 實現2個ASP.NET Core Api應用之間的消息傳輸。git

配置Azure Service Bus Queue

你能夠從官網文檔中瞭解到如何配置一個Azure Service Bus Queue.github

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portalc#

這裏咱們使用Queue或者Topic來實現消息傳輸。Queue是一種消息傳輸類型,一旦一個消息被一個消費者接收了,該消息就會從Queue中被移除。api

與Queue不一樣,Topic提供的是一對多的通信方式。架構

架構圖

整個應用的實現以下:async

  • Api 1負責發送消息
  • Api 2負責監聽Azure Service Bus,並處理接收到的消息

實現一個Service Bus Queue

這裏咱們首先須要引入** Microsoft.Azure.ServiceBus ** 程序集。Microsoft.Azure.ServiceBus是Azure Service Bus的客戶端庫。針對Service Bus的鏈接字符串咱們保存在項目的User Secret中。當部署項目的時候,咱們可使用Azure Key Valut來設置這個Secret值。函數

在Visual Studio中,右鍵點擊API1, API2項目屬性,選擇Manage User Secrets就能夠管理當前項目使用的全部私密信息。測試

爲了發送向Azure Service Bus Queue發送消息,咱們須要建立一個SendMessage方法,並接收一個消息參數。這裏咱們建立了一個咱們本身的消息內容類型MyPayload, 將當前該MyPayload對象序列化成Json字符串, 添加到一個Message對象中。ui

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;
 
namespace ServiceBusMessaging
{
    public class ServiceBusSender
    {
        private readonly QueueClient _queueClient;
        private readonly IConfiguration _configuration;
        private const string QUEUE_NAME = "simplequeue";
 
        public ServiceBusSender(IConfiguration configuration)
        {
            _configuration = configuration;
            _queueClient = new QueueClient(
            _configuration
                .GetConnectionString("ServiceBusConnectionString"), 
                QUEUE_NAME);
        }
         
        public async Task SendMessage(MyPayload payload)
        {
            string data = JsonConvert.SerializeObject(payload);
            Message message = new Message(Encoding.UTF8.GetBytes(data));
 
            await _queueClient.SendAsync(message);
        }
    }
}

在API 1和API 2中,咱們須要將ServiceBusSender註冊到應用程序的IOC容器中。這裏爲了測試方便,咱們同時註冊Swagger服務。

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
 
    services.AddScoped<ServiceBusSender>();
 
    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new Info
        {
            Version = "v1",
            Title = "Payload View API",
        });
    });
}

接下來,咱們就能夠在控制器中經過構造函數注入的方式使用這個服務了。

在API1中,咱們建立一個POST方法,這個方法會將API接收到Payload對象發送到Azure Service Bus Queue中。

[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required]Payload request)
{
    if (data.Any(d => d.Id == request.Id))
    {
        return Conflict($"data with id {request.Id} already exists");
    }
 
    data.Add(request);
 
    // Send this to the bus for the other services
    await _serviceBusSender.SendMessage(new MyPayload
    {
        Goals = request.Goals,
        Name = request.Name,
        Delete = false
    });
 
    return Ok(request);
}

從Queue中獲取消息

爲了監聽Azure Service Bus Queue, 並處理接收到的消息,咱們建立了一個新類ServiceBusConsumerServiceBusConsumer實現了IServiceBusConsumer接口。

Queue的鏈接字符串是使用IConfiguration讀取的。 RegisterOnMessageHandlerAndReceiveMessages方法負責註冊消息處理程序ProcessMessagesAsync處理消息。ProcessMessagesAsync方法會將獲得的消息轉換成對象,並調用IProcessData接口完成最終的消息處理。

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace ServiceBusMessaging
{
    public interface IServiceBusConsumer
    {
        void RegisterOnMessageHandlerAndReceiveMessages();
        Task CloseQueueAsync();
    }
 
    public class ServiceBusConsumer : IServiceBusConsumer
    {
        private readonly IProcessData _processData;
        private readonly IConfiguration _configuration;
        private readonly QueueClient _queueClient;
        private const string QUEUE_NAME = "simplequeue";
        private readonly ILogger _logger;
 
        public ServiceBusConsumer(IProcessData processData, 
            IConfiguration configuration, 
            ILogger<ServiceBusConsumer> logger)
        {
            _processData = processData;
            _configuration = configuration;
            _logger = logger;
            _queueClient = new QueueClient(
              _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
        }
 
        public void RegisterOnMessageHandlerAndReceiveMessages()
        {
            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            };
 
            _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
        }
 
        private async Task ProcessMessagesAsync(Message message, CancellationToken token)
        {
            var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
            _processData.Process(myPayload);
            await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
        }
 
        private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
 
            _logger.LogDebug($"- Endpoint: {context.Endpoint}");
            _logger.LogDebug($"- Entity Path: {context.EntityPath}");
            _logger.LogDebug($"- Executing Action: {context.Action}");
 
            return Task.CompletedTask;
        }
 
        public async Task CloseQueueAsync()
        {
            await _queueClient.CloseAsync();
        }
    }
}

其中IProcessData接口存在於類庫項目ServiceBusMessaging中,它是用來處理消息的。

public interface IProcessData
{
    void Process(MyPayload myPayload);
}

在Api 2中,咱們建立一個ProcessData類,它實現了IProcessData接口。

public class ProcessData : IProcessData
{
    public void Process(MyPayload myPayload)
    {
        DataServiceSimi.Data.Add(new Payload
        {
            Name = myPayload.Name,
            Goals = myPayload.Goals
        });
    }
}

這裏爲了簡單測試,咱們建立了一個靜態類DataServiceSimi,其中存放了API2中全部保存Payload對象。同時,咱們還建立了一個新的控制器ViewPayloadMessagesController,在其中添加了一個GET Action,並返回了靜態類DataServiceSimi中的全部數據。

[Route("api/[controller]")]
[ApiController]
public class ViewPayloadMessagesController : ControllerBase
{
    [HttpGet]
    [ProducesResponseType(StatusCodes.Status200OK)]
    public ActionResult<List<Payload>> Get()
    {
        return Ok(DataServiceSimi.Data);
    }
}

最後咱們還須要將ProcessData註冊到API2的IOC容器中。

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();

    services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
    services.AddTransient<IProcessData, ProcessData>();
}

最終效果

如今咱們分別啓用2個Api項目,並在Api 1的Swagger文檔界面,調用POST請求,添加一個Payload

操做完成以後,咱們訪問Api 2的/api/ViewPayloadMessages, 得到結果以下,Api 1發出的消息出如今了Api 2的結果集中,這說明Api 2從Azure Service Bus Queue中獲取了消息,並保存在了本身的靜態類DataServiceSimi中。

相關文章
相關標籤/搜索