ASP.NET Core Web API下事件驅動型架構的實現(一):一個簡單的實現

很長一段時間以來,我都在思考如何在ASP.NET Core的框架下,實現一套完整的事件驅動型架構。這個問題看上去有點大,其實主要目標是爲了實現一個基於ASP.NET Core的微服務,它可以很是簡單地訂閱來自於某個渠道的事件消息,並對接收到的消息進行處理,於此同時,它還可以向該渠道發送事件消息,以便訂閱該事件消息的消費者可以對消息數據作進一步處理。讓咱們回顧一下微服務之間通訊的幾種方式,分爲同步和異步兩種。同步通訊最多見的就是RESTful API,並且很是簡單輕量,一個Request/Response迴環就結束了;異步通訊最多見的就是經過消息渠道,將載有特殊意義的數據的事件消息發送到消息渠道,而對某種類型消息感興趣的消費者,就能夠獲取消息中所帶信息並執行相應操做,這也是咱們比較熟知的事件驅動架構的一種表現形式。雖然事件驅動型架構看起來很是複雜,從微服務的實現來看顯得有些繁重,但它的應用範圍確實很廣,也爲服務間通訊提供了新的思路。瞭解DDD的朋友相信必定知道CQRS體系結構模式,它就是一種事件驅動型架構。事實上,實現一套完整的、安全的、穩定的、正確的事件驅動架構並不簡單,因爲異步特性帶來的一致性問題會很是棘手,甚至須要藉助一些基礎結構層工具(好比關係型數據庫,不錯!只能是關係型數據庫)來解決一些特殊問題。本文就打算帶領你們一塊兒探探路,基於ASP.NET Core Web API實現一個相對比較簡單的事件驅動架構,而後引出一些有待深刻思考的問題,留在從此的文章中繼續討論。或許,本文所引入的源代碼沒法直接用於生產環境,但我但願本文介紹的內容可以給到讀者一些啓發,並可以幫助解決實際中遇到的問題。git

術語約定

本文會涉及一些相關的專業術語,在此先做約定:github

  • 事件:在某一特定時刻發生在某件事物上的一件事情,例如:在我撰寫本文的時候,電話鈴響了
  • 消息:承載事件數據的實體。事件的序列化/反序列化和傳輸都以消息的形式進行
  • 消息通訊渠道:一種帶有消息路由功能的數據傳輸機制,用以在消息的派發器和訂閱器之間進行數據傳輸

注意:爲了迎合描述的須要,在下文中可能會混用事件和消息兩個概念。sql

一個簡單的設計

先從簡單的設計開始,基本上事件驅動型架構會有事件消息(Events)、事件訂閱器(Event Subscriber)、事件派發器(Event Publisher)、事件處理器(Event Handler)以及事件總線(Event Bus)等主要組件,它們之間的關係大體以下:數據庫

class_diagram

首先,IEvent接口定義了事件消息(更確切地說,數據)的基本結構,幾乎全部的事件都會有一個惟一標識符(Id)和一個事件發生的時間(Timestamp),這個時間一般使用UTC時間做爲標準。IEventHandler定義了事件處理器接口,顯而易見,它包含兩個方法:CanHandle方法,用以肯定傳入的事件對象是否可被當前處理器所處理,以及Handle方法,它定義了事件的處理過程。IEvent和IEventHandler構成了事件處理的基本元素。編程

而後就是IEventSubscriber與IEventPublisher接口。前者表示實現該接口的類型爲事件訂閱器,它負責事件處理器的註冊,並偵聽來自事件通訊渠道上的消息,一旦所得到的消息可以被某個處理器處理,它就會指派該處理器對接收到的消息進行處理。所以,IEventSubscriber會保持着對事件處理器的引用;而對於實現了IEventPublisher接口的事件派發器而言,它的主要任務就是將事件消息發送到消息通訊渠道,以便訂閱端可以得到消息並進行處理。api

IEventBus接口表示消息通訊渠道,也就是你們所熟知的消息總線的概念。它不只具備消息訂閱的功能,並且還具備消息派發的能力,所以,它會同時繼承於IEventSubscriber和IEventPublisher接口。在上面的設計中,經過接口分離消息總線的訂閱器和派發器的角色是頗有必要的,由於兩種角色的各自職責不同,這樣的設計同時知足SOLID中的SRP和ISP兩個準則。安全

基於以上基礎模型,咱們能夠很快地將這個對象關係模型轉換爲C#代碼:數據結構

public interface IEvent
{
    Guid Id { get; }
    DateTime Timestamp { get; }
}

public interface IEventHandler
{
    Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default);
    bool CanHandle(IEvent @event);
}

public interface IEventHandler<in T> : IEventHandler
    where T : IEvent
{
    Task<bool> HandleAsync(T @event, CancellationToken cancellationToken = default);
}

public interface IEventPublisher : IDisposable
{
    Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent;
}

public interface IEventSubscriber : IDisposable
{
    void Subscribe();
}

public interface IEventBus : IEventPublisher, IEventSubscriber { }

短短30行代碼,就把咱們的基本對象關係描述清楚了。對於上面的代碼咱們須要注意如下幾點:架構

  1. 這段代碼使用了C# 7.1的新特性(default關鍵字)
  2. Publish以及Handle方法被替換爲支持異步調用的PublishAsync和HandleAsync方法,它們會返回Task對象,這樣能夠方便使用C#中async/await的編程模型
  3. 因爲咱們的這個模型能夠做爲實現消息系統的通用模型,而且會須要用到ASP.NET Core的項目中,所以,建議將這些接口的定義放在一個獨立的NetStandard的Class Library中,方便從此重用和擴展

OK,接口定義好了。實現呢?下面,咱們實現一個很是簡單的消息總線:PassThroughEventBus。在從此的文章中,我還會介紹如何基於RabbitMQ和Azure Service Bus實現不同的消息總線。app

PassThroughEventBus

顧名思義,PassThroughEventBus表示當有消息被派發到消息總線時,消息總線將不作任何處理與路由,而是直接將消息推送到訂閱方。在訂閱方的事件監聽函數中,會經過已經註冊的事件處理器對接收到的消息進行處理。整個過程並不會依賴於任何外部組件,不須要引用額外的開發庫,只是利用現有的.NET數據結構來模擬消息的派發和訂閱過程。所以,PassThroughEventBus不具有容錯和消息重發功能,不具有消息存儲和路由功能,咱們先實現這樣一個簡單的消息總線,來體驗事件驅動型架構的設計過程。

咱們可使用.NET中的Queue或者ConcurrentQueue等基本數據結構來做爲消息隊列的實現,與這些基本的數據結構相比,消息隊列自己有它本身的職責,它須要在消息被推送進隊列的同時通知調用方。固然,PassThroughEventBus不須要依賴於Queue或者ConcurrentQueue,它所要作的事情就是模擬一個消息隊列,當消息推送進來的時候,馬上通知訂閱方進行處理。一樣,爲了分離職責,咱們能夠引入一個EventQueue的實現(以下),從而將消息推送和路由的職責(基礎結構層的職責)從消息總線中分離出來。

internal sealed class EventQueue
{
    public event System.EventHandler<EventProcessedEventArgs> EventPushed;

    public EventQueue() { }

    public void Push(IEvent @event)
    {
        OnMessagePushed(new EventProcessedEventArgs(@event));
    }

    private void OnMessagePushed(EventProcessedEventArgs e) => this.EventPushed?.Invoke(this, e);
}

EventQueue中最主要的方法就是Push方法,從上面的代碼能夠看到,當EventQueue的Push方法被調用時,它將馬上觸發EventPushed事件,它是一個.NET事件,用以通知EventQueue對象的訂閱者,消息已經被派發。整個EventQueue的實現很是簡單,咱們僅專一於事件的路由,徹底沒有考慮任何額外的事情。

接下來,就是利用EventQueue來實現PassThroughEventBus。毫無懸念,PassThroughEventBus須要實現IEventBus接口,它的兩個基本操做分別是Publish和Subscribe。在Publish方法中,會將傳入的事件消息轉發到EventQueue上,而Subscribe方法則會訂閱EventQueue.EventPushed事件(.NET事件),而在EventPushed事件處理過程當中,會從全部已註冊的事件處理器(Event Handlers)中找到可以處理所接收到的事件,並對其進行處理。整個流程仍是很是清晰的。如下即是PassThroughEventBus的實現代碼:

public sealed class PassThroughEventBus : IEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly IEnumerable<IEventHandler> eventHandlers;

    public PassThroughEventBus(IEnumerable<IEventHandler> eventHandlers)
    {
        this.eventHandlers = eventHandlers;
    }

    private void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => (from eh in this.eventHandlers
            where eh.CanHandle(e.Event)
            select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event));

    public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent
            => Task.Factory.StartNew(() => eventQueue.Push(@event));

    public void Subscribe()
        => eventQueue.EventPushed += EventQueue_EventPushed;


    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls
    void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                this.eventQueue.EventPushed -= EventQueue_EventPushed;
            }

            disposedValue = true;
        }
    }
    public void Dispose() => Dispose(true);
    #endregion
}

實現過程很是簡單,固然,從這些代碼也能夠更清楚地瞭解到,PassThroughEventBus不作任何路由處理,更不會依賴於一個基礎結構設施(好比實現了AMQP的消息隊列),所以,不要期望可以在生產環境中使用它。不過,目前來看,它對於咱們接下來要討論的事情仍是會頗有幫助的,至少在咱們引入基於RabbitMQ等實現的消息總線以前。

一樣地,請將PassThroughEventBus實如今另外一個NetStandard的Class Library中,雖然它不須要額外的依賴,但它畢竟是衆多消息總線中的一種,將它從接口定義的程序集中剝離開來,好處有兩點:第一,保證了定義接口的程序集的純淨度,使得該程序集不須要依賴任何外部組件,並確保了該程序集的職責單一性,即爲消息系統的實現提供基礎類庫;第二,將PassThroughEventBus置於獨立的程序集中,有利於調用方針對IEventBus進行技術選擇,好比,若是開發者選擇使用基於RabbitMQ的實現,那麼,只須要引用基於RabbitMQ實現IEventBus接口的程序集就能夠了,而無需引用包含了PassThroughEventBus的程序集。這一點我以爲能夠概括爲框架設計中「隔離依賴關係(Dependency Segregation)」的準則。

好了,基本組件都定義好了,接下來,讓咱們一塊兒基於ASP.NET Core Web API來作一個RESTful服務,並接入上面的消息總線機制,實現消息的派發和訂閱。

Customer RESTful API

咱們仍然以客戶管理的RESTful API爲例子,不過,咱們不會過多地討論如何去實現管理客戶信息的RESTful服務,那並非本文的重點。做爲一個案例,我使用ASP.NET Core 2.0 Web API創建了這個服務,使用Visual Studio 2017 15.5作開發,並在CustomersController中使用Dapper來對客戶信息CRUD。後臺基於SQL Server 2017 Express Edition,使用SQL Server Management Studio可以讓我方便地查看數據庫操做的結果。

RESTful API的實現

假設咱們的客戶信息只包含客戶ID和名稱,下面的CustomersController代碼展現了咱們的RESTful服務是如何保存並讀取客戶信息的。固然,我已經將本文的代碼經過Github開源,開源協議爲MIT,雖然商業友好,但畢竟是案例代碼沒有通過測試,因此請謹慎使用。本文源代碼的使用我會在文末介紹。

[Route("api/[controller]")]
public class CustomersController : Controller
{
    private readonly IConfiguration configuration;
    private readonly string connectionString;

    public CustomersController(IConfiguration configuration)
    {
        this.configuration = configuration;
        this.connectionString = configuration["mssql:connectionString"];
    }


    // 獲取指定ID的客戶信息
    [HttpGet("{id}")]
    public async Task<IActionResult> Get(Guid id)
    {
        const string sql = "SELECT [CustomerId] AS Id, [CustomerName] AS Name FROM [dbo].[Customers] WHERE [CustomerId]=@id";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = await connection.QueryFirstOrDefaultAsync<Model.Customer>(sql, new { id });
            if (customer == null)
            {
                return NotFound();
            }

            return Ok(customer);
        }
    }

    // 建立新的客戶信息
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] dynamic model)
    {
        var name = (string)model.Name;
        if (string.IsNullOrEmpty(name))
        {
            return BadRequest();
        }

        const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = new Model.Customer(name);
            await connection.ExecuteAsync(sql, customer);

            return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
        }
    }
}

代碼一如既往的簡單,Web API控制器經過Dapper簡單地實現了客戶信息的建立和返回。咱們不妨測試一下,使用下面的Invoke-RestMethod PowerShell指令,發送Post請求,經過上面的Create方法建立一個用戶:

image

能夠看到,response中已經返回了新建客戶的ID號。接下來,繼續使用Invoke-RestMethod來獲取新建客戶的詳細信息:

image

OK,API調試徹底沒有問題。下面,咱們將這個案例再擴充一下,咱們但願這個API在完成客戶信息建立的同時,向外界發送一條「客戶信息已建立」的事件,並設置一個事件處理器,負責將該事件的詳細內容保存到數據庫中。

加入事件總線和消息處理機制

首先,咱們在ASP.NET Core Web API項目上,添加對以上兩個程序集的引用,而後,按常規作法,在ConfigureServices方法中,將PassThroughEventBus添加到IoC容器中:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

在此,將事件總線註冊爲單例(Singleton)服務,是由於它不保存狀態。理論上講,使用單例服務時,須要特別注意服務實例對象的生命週期管理,由於它的生命週期是整個應用程序級別,在程序運行的過程當中,由其引用的對象資源將沒法釋放,所以,當程序結束運行時,須要合理地將這些資源dispose掉。好在ASP.NET Core的依賴注入框架中已經幫咱們處理過了,所以,對於上面的PassThroughEventBus單例註冊,咱們不須要過多擔憂,程序執行結束並正常退出時,依賴注入框架會自動幫咱們dispose掉PassThroughEventBus的單例實例。那麼對於單例實例來講,咱們是否只須要經過AddSingleton方法進行註冊就能夠了,而無需關注它是否真的被dispose了呢?答案是否認的,有興趣的讀者能夠參考微軟的官方文檔,在下一篇文章中我會對這部份內容作些介紹。

接下來,咱們須要定義一個CustomerCreatedEvent對象,表示「客戶信息已經建立」這一事件信息,同時,再定義一個CustomerCreatedEventHandler事件處理器,用來處理從PassThroughEventBus接收到的事件消息。代碼以下,固然也很簡單:

public class CustomerCreatedEvent : IEvent
{
    public CustomerCreatedEvent(string customerName)
    {
        this.Id = Guid.NewGuid();
        this.Timestamp = DateTime.UtcNow;
        this.CustomerName = customerName;
    }

    public Guid Id { get; }

    public DateTime Timestamp { get; }

    public string CustomerName { get; }
}

public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
{
    public bool CanHandle(IEvent @event)
        => @event.GetType().Equals(typeof(CustomerCreatedEvent));

    public Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        return Task.FromResult(true);
    

    public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)
        => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
}

二者分別實現了咱們最開始定義好的IEvent和IEventHandler接口。在CustomerCreatedEventHandler類的第一個HandleAsync重載方法中,咱們暫且讓它簡單地返回一個true值,表示事件處理成功。下面要作的事情就是,在客戶信息建立成功後,向事件總線發送CustomerCreatedEvent事件,以及在ASP.NET Core Web API程序啓動的時候,註冊CustomerCreatedEventHandler實例,並調用事件總線的Subscribe方法,使其開始偵聽事件的派發行爲。

因而,CustomerController須要依賴IEventBus,而且在CustomerController.Create方法中,須要經過調用IEventBus的Publish方法將事件發送出去。現對CustomerController的實現作一些調整,調整後代碼以下:

[Route("api/[controller]")]
public class CustomersController : Controller
{
    private readonly IConfiguration configuration;
    private readonly string connectionString;
    private readonly IEventBus eventBus;

    public CustomersController(IConfiguration configuration,
        IEventBus eventBus)
    {
        this.configuration = configuration;
        this.connectionString = configuration["mssql:connectionString"];
        this.eventBus = eventBus;
    }

    // 建立新的客戶信息
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] dynamic model)
    {
        var name = (string)model.Name;
        if (string.IsNullOrEmpty(name))
        {
            return BadRequest();
        }

        const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = new Model.Customer(name);
            await connection.ExecuteAsync(sql, customer);

            await this.eventBus.PublishAsync(new CustomerCreatedEvent(name));

            return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
        }
    }
    
    // Get方法暫且省略
}

而後,修改Startup.cs中的ConfigureServices方法,將CustomerCreatedEventHandler註冊進來:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();

    services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

而且調用Subscribe方法,開始偵聽消息總線:

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
    eventBus.Subscribe();

    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }

    app.UseMvc();
}

OK,如今讓咱們在CustomerCreatedEventHandler的HandleAsync方法上設置個斷點,按下F5啓用Visual Studio 2017調試,而後從新使用Invoke-RestMethod命令發送一個Post請求,能夠看到,HandleAsync方法上的斷點被命中,同時事件已被正確派發:

image

數據庫中的數據也被正確更新:

image

目前還差最後一小步,就是在HandleAsync中,將CustomerCreatedEvent對象的數據序列化並保存到數據庫中。固然這也不難,一樣能夠考慮使用Dapper,或者直接使用ADO.NET,甚至使用比較重量級的Entity Framework Core,均可以實現。那就在此將這個問題留給感興趣的讀者朋友本身搞定啦。

小結

到這裏基本上本文的內容也就告一段落了,回顧一下,本文一開始就提出了一種相對簡單的消息系統和事件驅動型架構的設計模型,並實現了一個最簡單的事件總線:PassThroughEventBus。隨後,結合一個實際的ASP.NET Core Web API案例,瞭解了在RESTful API中實現事件消息派發和訂閱的過程,並實現了在事件處理器中,對得到的事件消息進行處理。

然而,咱們還有不少問題須要更深刻地思考,好比:

  • 若是事件處理器須要依賴基礎結構層組件,依賴關係如何管理?組件生命週期如何管理?
  • 如何實現基於RabbitMQ或者Azure Service Bus的事件總線?
  • 若是在數據庫更新成功後,事件發送失敗怎麼辦?
  • 如何保證事件處理的順序?

等等。。。在接下來的文章中,我會盡力作更詳細的介紹。

源代碼的使用

本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo裏,經過不一樣的release tag來區分針對不一樣章節的源代碼。本文的源代碼請參考chapter_1這個tag,以下:

image

接下來還將會有chapter_二、chapter_3等這些tag,對應本系列文章的第二部分、第三部分等等。敬請期待。

相關文章
相關標籤/搜索