RabbitMQ的事件總線

RabbitMQ的事件總線

在上文中,咱們討論了事件處理器中對象生命週期的問題,在進入新的討論以前,首先讓咱們總結一下,咱們已經實現了哪些內容。下面的類圖描述了咱們已經實現的組件及其之間的關係,貌似系統已經變得愈來愈複雜了。html

class_diagram_chapter2

其中綠色的部分就是上文中新實現的部分,包括一個簡單的Event Store,一個事件處理器執行上下文的接口,以及一個基於ASP.NET Core依賴注入框架的執行上下文的實現。接下來,咱們打算淘汰PassThroughEventBus,而後基於RabbitMQ實現一套新的事件總線。git

事件總線的重構

根據前面的結論,事件總線的執行須要依賴於事件處理器執行上下文,也就是上面類圖中PassThroughEventBus對於IEventHandlerExecutionContext的引用。更具體些,是在事件總線訂閱某種類型的事件時,須要將事件處理器註冊到IEventHandlerExecutionContext中。那麼在實現RabbitMQ時,也會有着相似的設計需求,即RabbitMQEventBus也須要依賴IEventHandlerExecutionContext接口,以保證事件處理器生命週期的合理性。github

爲此,咱們新建一個基類:BaseEventBus,並將這部分公共的代碼提取出來,須要注意如下幾點:sql

  1. 經過BaseEventBus的構造函數傳入IEventHandlerExecutionContext實例,也就限定了全部子類的實現中,必須在構造函數中傳入IEventHandlerExecutionContext實例,這對於框架的設計很是有利:在實現新的事件總線時,框架的使用者無需查看API文檔,便可知道事件總線與IEventHandlerExecutionContext之間的關係,這符合SOLID原則中的Open/Closed Principle
  2. BaseEventBus的實現應該放在EdaSample.Common程序集中,更確切地說,它應該放在EdaSample.Common.Events命名空間下,由於它是屬於框架級別的組件,而且不會依賴任何基礎結構層的組件

BaseEventBus的代碼以下:shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  abstract  class  BaseEventBus : IEventBus
{
     protected  readonly  IEventHandlerExecutionContext eventHandlerExecutionContext;
 
     protected  BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
     {
         this .eventHandlerExecutionContext = eventHandlerExecutionContext;
     }
 
     public  abstract  Task PublishAsync<TEvent>(TEvent @ event , CancellationToken cancellationToken = default ) where  TEvent : IEvent;
 
     public  abstract  void  Subscribe<TEvent, TEventHandler>()
         where  TEvent : IEvent
         where  TEventHandler : IEventHandler<TEvent>;
     
     // Disposable接口實現代碼省略
}

在上面的代碼中,PublishAsync和Subscribe方法是抽象方法,以便子類根據不一樣的須要來實現。數據庫

接下來就是調整PassThroughEventBus,使其繼承於BaseEventBus:json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public  sealed  class  PassThroughEventBus : BaseEventBus
{
     private  readonly  EventQueue eventQueue = new  EventQueue();
     private  readonly  ILogger logger;
 
     public  PassThroughEventBus(IEventHandlerExecutionContext context,
         ILogger<PassThroughEventBus> logger)
         : base (context)
     {
         this .logger = logger;
         logger.LogInformation($ "PassThroughEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}." );
 
         eventQueue.EventPushed += EventQueue_EventPushed;
     }
 
     private  async void  EventQueue_EventPushed( object  sender, EventProcessedEventArgs e)
         => await this .eventHandlerExecutionContext.HandleEventAsync(e.Event);
 
     public  override  Task PublishAsync<TEvent>(TEvent @ event , CancellationToken cancellationToken = default )
     {
         return  Task.Factory.StartNew(() => eventQueue.Push(@ event ));
     }
 
     public  override  void  Subscribe<TEvent, TEventHandler>()
     {
         if  (! this .eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
         {
             this .eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
         }
     }
     
     // Disposable接口實現代碼省略
}

代碼都很簡單,也就很少作說明了,接下來,咱們開始實現RabbitMQEventBus。架構

RabbitMQEventBus的實現

首先須要新建一個.NET Standard 2.0的項目,使用.NET Standard 2.0的項目模板所建立的項目,能夠同時被.NET Framework 4.6.1或者.NET Core 2.0的應用程序所引用。建立新的類庫項目的目的,是由於RabbitMQEventBus的實現須要依賴RabbitMQ C#開發庫這個外部引用。所以,爲了保證框架核心的純淨和穩定,須要在新的類庫項目中實現RabbitMQEventBus。app

Note:對於RabbitMQ及其C#庫的介紹,本文就再也不涉及了,網上有不少資料和文檔,博客園有不少朋友在這方面都有使用經驗分享,RabbitMQ官方文檔也寫得很是詳細,固然是英文版的,若是英語比較好的話,建議參考官方文檔。框架

如下就是在EdaSample案例中,RabbitMQEventBus的實現,咱們先讀一讀代碼,再對這部分代碼作些分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public  class  RabbitMQEventBus : BaseEventBus
{
     private  readonly  IConnectionFactory connectionFactory;
     private  readonly  IConnection connection;
     private  readonly  IModel channel;
     private  readonly  string  exchangeName;
     private  readonly  string  exchangeType;
     private  readonly  string  queueName;
     private  readonly  bool  autoAck;
     private  readonly  ILogger logger;
     private  bool  disposed;
 
     public  RabbitMQEventBus(IConnectionFactory connectionFactory,
         ILogger<RabbitMQEventBus> logger,
         IEventHandlerExecutionContext context,
         string  exchangeName,
         string  exchangeType = ExchangeType.Fanout,
         string  queueName = null ,
         bool  autoAck = false )
         : base (context)
     {
         this .connectionFactory = connectionFactory;
         this .logger = logger;
         this .connection = this .connectionFactory.CreateConnection();
         this .channel = this .connection.CreateModel();
         this .exchangeType = exchangeType;
         this .exchangeName = exchangeName;
         this .autoAck = autoAck;
 
         this .channel.ExchangeDeclare( this .exchangeName, this .exchangeType);
 
         this .queueName = this .InitializeEventConsumer(queueName);
 
         logger.LogInformation($ "RabbitMQEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}." );
     }
 
     public  override  Task PublishAsync<TEvent>(TEvent @ event , CancellationToken cancellationToken = default (CancellationToken))
     {
         var  json = JsonConvert.SerializeObject(@ event , new  JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
         var  eventBody = Encoding.UTF8.GetBytes(json);
         channel.BasicPublish( this .exchangeName,
             @ event .GetType().FullName,
             null ,
             eventBody);
         return  Task.CompletedTask;
     }
 
     public  override  void  Subscribe<TEvent, TEventHandler>()
     {
         if  (! this .eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
         {
             this .eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
             this .channel.QueueBind( this .queueName, this .exchangeName, typeof (TEvent).FullName);
         }
     }
 
     protected  override  void  Dispose( bool  disposing)
     {
         if  (!disposed)
         {
             if  (disposing)
             {
                 this .channel.Dispose();
                 this .connection.Dispose();
 
                 logger.LogInformation($ "RabbitMQEventBus已經被Dispose。Hash Code:{this.GetHashCode()}." );
             }
 
             disposed = true ;
             base .Dispose(disposing);
         }
     }
 
     private  string  InitializeEventConsumer( string  queue)
     {
         var  localQueueName = queue;
         if  ( string .IsNullOrEmpty(localQueueName))
         {
             localQueueName = this .channel.QueueDeclare().QueueName;
         }
         else
         {
             this .channel.QueueDeclare(localQueueName, true , false , false , null );
         }
 
         var  consumer = new  EventingBasicConsumer( this .channel);
         consumer.Received += async (model, eventArgument) =>
         {
             var  eventBody = eventArgument.Body;
             var  json = Encoding.UTF8.GetString(eventBody);
             var  @ event  = (IEvent)JsonConvert.DeserializeObject(json, new  JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
             await this .eventHandlerExecutionContext.HandleEventAsync(@ event );
             if  (!autoAck)
             {
                 channel.BasicAck(eventArgument.DeliveryTag, false );
             }
         };
 
         this .channel.BasicConsume(localQueueName, autoAck: this .autoAck, consumer: consumer);
 
         return  localQueueName;
     }
}

閱讀上面的代碼,須要注意如下幾點:

  1. 正如上面所述,構造函數須要接受IEventHandlerExecutionContext對象,並經過構造函數的base調用,將該對象傳遞給基類
  2. 構造函數中,queueName參數是可選參數,也就是說:
    1. 若是經過RabbitMQEventBus發送事件消息,則無需指定queueName參數,僅需指定exchangeName便可,由於在RabbitMQ中,消息的發佈方無需知道消息是發送到哪一個隊列中
    2. 若是經過RabbitMQEventBus接收事件消息,那麼也分兩種狀況:
      1. 若是兩個進程在使用RabbitMQEventBus時,同時指定了queueName參數,而且queueName的值相同,那麼這兩個進程將會輪流處理路由至queueName隊列的消息
      2. 若是兩個進程在使用RabbitMQEventBus時,同時指定了queueName參數,但queueName的值不相同,或者都沒有指定queueName參數,那麼這兩個進程將會同時處理路由至queueName隊列的消息
    3. 有關Exchange和Queue的概念,請參考RabbitMQ的官方文檔
  3. 在Subscribe方法中,除了將事件處理器註冊到事件處理器執行上下文以外,還經過QueueBind方法,將指定的隊列綁定到Exchange上
  4. 事件數據都經過Newtonsoft.Json進行序列化和反序列化,使用TypeNameHandling.All這一設定,使得序列化的JSON字符串中帶有類型名稱信息。在此處這樣作既是合理的,又是必須的,由於若是沒有帶上類型名稱的信息,JsonConvert.DeserializeObject反序列化時,將沒法斷定獲得的對象是否能夠轉換爲IEvent對象,這樣就會出現異常。但若是是實現一個更爲通用的消息系統,應用程序派發出去的事件消息可能還會被由Python或者Java所實現的應用程序所使用,那麼對於這些應用,它們並不知道Newtonsoft.Json是什麼,也沒法經過Newtonsoft.Json加入的類型名稱來獲知事件消息的初衷(Intent),Newtonsoft.Json所帶的類型信息又會顯得冗餘。所以,簡單地使用Newtonsoft.Json做爲事件消息的序列化、反序列化工具,實際上是欠妥的。更好的作法是,實現自定義的消息序列化、反序列化器,在進行序列化的時候,將.NET相關的諸如類型信息等,做爲Metadata(元數據)附着在序列化的內容上。理論上說,在序列化的數據中加上一些元數據信息是合理的,只不過咱們對這些元數據作一些標註,代表它是由.NET框架產生的,第三方系統若是不關心這些信息,能夠對元數據不作任何處理
  5. 在Dispose方法中,注意將RabbitMQ所使用的資源dispose掉

使用RabbitMQEventBus

在Customer服務中,使用RabbitMQEventBus就很是簡單了,只須要引用RabbitMQEventBus的程序集,而後在Startup.cs文件的ConfigureServices方法中,替換PassThroughEventBus的使用便可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public  void  ConfigureServices(IServiceCollection services)
{
     this .logger.LogInformation( "正在對服務進行配置..." );
 
     services.AddMvc();
 
     services.AddTransient<IEventStore>(serviceProvider =>
         new  DapperEventStore(Configuration[ "mssql:connectionString" ],
             serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));
 
     var  eventHandlerExecutionContext = new  EventHandlerExecutionContext(services,
         sc => sc.BuildServiceProvider());
     services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
     // services.AddSingleton<IEventBus, PassThroughEventBus>();
 
     var  connectionFactory = new  ConnectionFactory { HostName = "localhost"  };
     services.AddSingleton<IEventBus>(sp => new  RabbitMQEventBus(connectionFactory,
         sp.GetRequiredService<ILogger<RabbitMQEventBus>>(),
         sp.GetRequiredService<IEventHandlerExecutionContext>(),
         RMQ_EXCHANGE,
         queueName: RMQ_QUEUE));
 
     this .logger.LogInformation( "服務配置完成,已註冊到IoC容器!" );
}

Note:一種更好的作法是經過配置文件來配置IoC容器,在曾經的Microsoft Patterns and Practices Enterprise Library Unity Container中,使用配置文件是很方便的。這樣只須要Customer服務可以經過配置文件來配置IoC容器,同時只須要讓Customer服務依賴(注意,不是程序集引用)於不一樣的事件總線的實現便可,無需對Customer服務從新編譯。

下面來驗證一下效果。首先確保RabbitMQ已經配置並啓動穩當,我是安裝在本地機器上,使用默認安裝。首先啓動ASP.NET Core Web API,而後經過Powershell發起兩次建立Customer的請求:

image

查看一下數據庫是否更新正常:

image

並檢查一下日誌信息:

image

RabbitMQ中Exchange的信息:

image

總結

本文提供了一種RabbitMQEventBus的實現,目前來講是夠用的,並且這種實現是可使用在實際項目當中的。在實際使用中,或許也會碰到一些與RabbitMQ自己有關的問題,這就須要具體問題具體分析了。此外,本文沒有涉及事件消息丟失、重發而後保證最終一致性的問題,這些內容會在後面討論。從下文開始,咱們着手逐步實現CQRS架構的領域事件和事件存儲部分。

源代碼的使用

本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo裏,經過不一樣的release tag來區分針對不一樣章節的源代碼。本文的源代碼請參考chapter_3這個tag,以下:

image

歡迎訪問個人博客新站:http://sunnycoding.net

相關文章
相關標籤/搜索